This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit beb976a0ef3a3a6c1d488d4e3beeefaef1f545ce Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Tue Mar 10 16:42:49 2020 +0800 KYLIN-4417 Use hash rather than random to avoid potential issue in ConvergeCuboidDataPartitioner --- .../engine/mr/steps/ConvergeCuboidDataPartitioner.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java index 605905a..3a31318 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java @@ -18,8 +18,6 @@ package org.apache.kylin.engine.mr.steps; -import java.util.Random; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -28,10 +26,12 @@ import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.engine.mr.common.BatchConstants; import com.google.common.base.Preconditions; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> implements Configurable { - private Random rand = new Random(); + private static final HashFunction hashFunc = Hashing.murmur3_128(); private Configuration conf; private boolean enableSharding; @@ -40,12 +40,14 @@ public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> imple @Override public int getPartition(Text key, Text value, int numReduceTasks) { + long hash = hashFunc.hashBytes(key.getBytes()).asLong(); + long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding); // the first numReduceBaseCuboid are for base cuboid if (cuboidID == baseCuboidID) { - return rand.nextInt(numReduceBaseCuboid); + return getRemainder(hash, numReduceBaseCuboid); } else { - return numReduceBaseCuboid + rand.nextInt(numReduceTasks - numReduceBaseCuboid); + return numReduceBaseCuboid + getRemainder(hash, numReduceTasks - numReduceBaseCuboid); } } @@ -64,4 +66,9 @@ public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> imple public Configuration getConf() { return conf; } + + private static int getRemainder(long val, int base) { + int rem = (int) val % base; + return rem >= 0 ? rem : rem + base; + } }