This is an automated email from the ASF dual-hosted git repository.

gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f766e  HIVE-21294: Vectorization: 1-reducer Shuffle can skip the 
object hash functions (Teddy Choi, reviewed by Gopal V)
84f766e is described below

commit 84f766e79f4d28dbd5b59067c40741fbff297aa7
Author: Teddy Choi <tc...@apache.org>
AuthorDate: Wed Mar 6 17:10:32 2019 -0800

    HIVE-21294: Vectorization: 1-reducer Shuffle can skip the object hash 
functions (Teddy Choi, reviewed by Gopal V)
    
    Signed-off-by: Gopal V <gop...@apache.org>
---
 .../VectorReduceSinkObjectHashOperator.java        | 65 ++++++++++------------
 1 file changed, 28 insertions(+), 37 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
index 767df21..ef5ca02 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
@@ -64,6 +64,8 @@ public class VectorReduceSinkObjectHashOperator extends 
VectorReduceSinkCommonOp
   protected int[] reduceSinkPartitionColumnMap;
   protected TypeInfo[] reduceSinkPartitionTypeInfos;
 
+  private boolean isSingleReducer;
+
   protected VectorExpression[] reduceSinkPartitionExpressions;
 
   // The above members are initialized by the constructor and must not be
@@ -119,6 +121,8 @@ public class VectorReduceSinkObjectHashOperator extends 
VectorReduceSinkCommonOp
       reduceSinkPartitionTypeInfos = 
vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos();
       reduceSinkPartitionExpressions = 
vectorReduceSinkInfo.getReduceSinkPartitionExpressions();
     }
+
+    isSingleReducer = this.conf.getNumReducers() == 1;
   }
 
   private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) {
@@ -255,48 +259,35 @@ public class VectorReduceSinkObjectHashOperator extends 
VectorReduceSinkCommonOp
 
       final int size = batch.size;
 
-      if (isEmptyBuckets) { // EmptyBuckets = true
-        if (isEmptyPartitions) { // isEmptyPartition = true
-          for (int logical = 0; logical< size; logical++) {
-            final int batchIndex = (selectedInUse ? selected[logical] : 
logical);
-            final int hashCode = nonPartitionRandom.nextInt();
-            postProcess(batch, batchIndex, tag, hashCode);
-          }
-        } else { // isEmptyPartition = false
-          for (int logical = 0; logical< size; logical++) {
-            final int batchIndex = (selectedInUse ? selected[logical] : 
logical);
-            partitionVectorExtractRow.extractRow(batch, batchIndex, 
partitionFieldValues);
-            final int hashCode = hashFunc.apply(partitionFieldValues, 
partitionObjectInspectors);
-            postProcess(batch, batchIndex, tag, hashCode);
+      for (int logical = 0; logical< size; logical++) {
+        final int batchIndex = (selectedInUse ? selected[logical] : logical);
+        int hashCode;
+        if (isEmptyPartitions) {
+          if (isSingleReducer) {
+            // Empty partition, single reducer -> constant hashCode
+            hashCode = 0;
+          } else {
+            // Empty partition, multiple reducers -> random hashCode
+            hashCode = nonPartitionRandom.nextInt();
           }
+        } else {
+          // Compute hashCode from partitions
+          partitionVectorExtractRow.extractRow(batch, batchIndex, 
partitionFieldValues);
+          hashCode = hashFunc.apply(partitionFieldValues, 
partitionObjectInspectors);
         }
-      } else { // EmptyBuckets = false
-        if (isEmptyPartitions) { // isEmptyPartition = true
-          for (int logical = 0; logical< size; logical++) {
-            final int batchIndex = (selectedInUse ? selected[logical] : 
logical);
-            bucketVectorExtractRow.extractRow(batch, batchIndex, 
bucketFieldValues);
-            final int bucketNum = ObjectInspectorUtils.getBucketNumber(
-              hashFunc.apply(bucketFieldValues, bucketObjectInspectors), 
numBuckets);
-            final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum;
-            if (bucketExpr != null) {
-              evaluateBucketExpr(batch, batchIndex, bucketNum);
-            }
-            postProcess(batch, batchIndex, tag, hashCode);
-          }
-        } else { // isEmptyPartition = false
-          for (int logical = 0; logical< size; logical++) {
-            final int batchIndex = (selectedInUse ? selected[logical] : 
logical);
-            partitionVectorExtractRow.extractRow(batch, batchIndex, 
partitionFieldValues);
-            bucketVectorExtractRow.extractRow(batch, batchIndex, 
bucketFieldValues);
-            final int bucketNum = ObjectInspectorUtils.getBucketNumber(
+
+        // Compute hashCode from buckets
+        if (!isEmptyBuckets) {
+          bucketVectorExtractRow.extractRow(batch, batchIndex, 
bucketFieldValues);
+          final int bucketNum = ObjectInspectorUtils.getBucketNumber(
               hashFunc.apply(bucketFieldValues, bucketObjectInspectors), 
numBuckets);
-            final int hashCode = hashFunc.apply(partitionFieldValues, 
partitionObjectInspectors) * 31 + bucketNum;
-            if (bucketExpr != null) {
-              evaluateBucketExpr(batch, batchIndex, bucketNum);
-            }
-            postProcess(batch, batchIndex, tag, hashCode);
+          if (bucketExpr != null) {
+            evaluateBucketExpr(batch, batchIndex, bucketNum);
           }
+          hashCode = hashCode * 31 + bucketNum;
         }
+
+        postProcess(batch, batchIndex, tag, hashCode);
       }
     } catch (Exception e) {
       throw new HiveException(e);

Reply via email to