This is an automated email from the ASF dual-hosted git repository. gopalv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 84f766e HIVE-21294: Vectorization: 1-reducer Shuffle can skip the object hash functions (Teddy Choi, reviewed by Gopal V) 84f766e is described below commit 84f766e79f4d28dbd5b59067c40741fbff297aa7 Author: Teddy Choi <tc...@apache.org> AuthorDate: Wed Mar 6 17:10:32 2019 -0800 HIVE-21294: Vectorization: 1-reducer Shuffle can skip the object hash functions (Teddy Choi, reviewed by Gopal V) Signed-off-by: Gopal V <gop...@apache.org> --- .../VectorReduceSinkObjectHashOperator.java | 65 ++++++++++------------ 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 767df21..ef5ca02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -64,6 +64,8 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp protected int[] reduceSinkPartitionColumnMap; protected TypeInfo[] reduceSinkPartitionTypeInfos; + private boolean isSingleReducer; + protected VectorExpression[] reduceSinkPartitionExpressions; // The above members are initialized by the constructor and must not be @@ -119,6 +121,8 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos(); reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions(); } + + isSingleReducer = this.conf.getNumReducers() == 1; } private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) { @@ -255,48 +259,35 @@ public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOp final int size = batch.size; - if (isEmptyBuckets) { // EmptyBuckets = true - if (isEmptyPartitions) { // isEmptyPartition = true - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - final int hashCode = nonPartitionRandom.nextInt(); - postProcess(batch, batchIndex, tag, hashCode); - } - } else { // isEmptyPartition = false - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors); - postProcess(batch, batchIndex, tag, hashCode); + for (int logical = 0; logical< size; logical++) { + final int batchIndex = (selectedInUse ? selected[logical] : logical); + int hashCode; + if (isEmptyPartitions) { + if (isSingleReducer) { + // Empty partition, single reducer -> constant hashCode + hashCode = 0; + } else { + // Empty partition, multiple reducers -> random hashCode + hashCode = nonPartitionRandom.nextInt(); } + } else { + // Compute hashCode from partitions + partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); + hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors); } - } else { // EmptyBuckets = false - if (isEmptyPartitions) { // isEmptyPartition = true - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( - hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets); - final int hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum; - if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); - } - postProcess(batch, batchIndex, tag, hashCode); - } - } else { // isEmptyPartition = false - for (int logical = 0; logical< size; logical++) { - final int batchIndex = (selectedInUse ? selected[logical] : logical); - partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues); - bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); - final int bucketNum = ObjectInspectorUtils.getBucketNumber( + + // Compute hashCode from buckets + if (!isEmptyBuckets) { + bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues); + final int bucketNum = ObjectInspectorUtils.getBucketNumber( hashFunc.apply(bucketFieldValues, bucketObjectInspectors), numBuckets); - final int hashCode = hashFunc.apply(partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum; - if (bucketExpr != null) { - evaluateBucketExpr(batch, batchIndex, bucketNum); - } - postProcess(batch, batchIndex, tag, hashCode); + if (bucketExpr != null) { + evaluateBucketExpr(batch, batchIndex, bucketNum); } + hashCode = hashCode * 31 + bucketNum; } + + postProcess(batch, batchIndex, tag, hashCode); } } catch (Exception e) { throw new HiveException(e);