This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit beb976a0ef3a3a6c1d488d4e3beeefaef1f545ce
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Tue Mar 10 16:42:49 2020 +0800

    KYLIN-4417 Use hash rather than random to avoid potential issue in 
ConvergeCuboidDataPartitioner
---
 .../engine/mr/steps/ConvergeCuboidDataPartitioner.java  | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
index 605905a..3a31318 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.util.Random;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -28,10 +26,12 @@ import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 
 import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 
 public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> 
implements Configurable {
 
-    private Random rand = new Random();
+    private static final HashFunction hashFunc = Hashing.murmur3_128();
 
     private Configuration conf;
     private boolean enableSharding;
@@ -40,12 +40,14 @@ public class ConvergeCuboidDataPartitioner extends 
Partitioner<Text, Text> imple
 
     @Override
     public int getPartition(Text key, Text value, int numReduceTasks) {
+        long hash = hashFunc.hashBytes(key.getBytes()).asLong();
+
         long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), 
enableSharding);
         // the first numReduceBaseCuboid are for base cuboid
         if (cuboidID == baseCuboidID) {
-            return rand.nextInt(numReduceBaseCuboid);
+            return getRemainder(hash, numReduceBaseCuboid);
         } else {
-            return numReduceBaseCuboid + rand.nextInt(numReduceTasks - 
numReduceBaseCuboid);
+            return numReduceBaseCuboid + getRemainder(hash, numReduceTasks - 
numReduceBaseCuboid);
         }
     }
 
@@ -64,4 +66,9 @@ public class ConvergeCuboidDataPartitioner extends 
Partitioner<Text, Text> imple
     public Configuration getConf() {
         return conf;
     }
+
+    private static int getRemainder(long val, int base) {
+        int rem = (int) val % base;
+        return rem >= 0 ? rem : rem + base;
+    }
 }

Reply via email to