This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a88871e  HIVE-23975: Reuse evicted keys from aggregation buffers 
(Mustafa Iman via Rajesh Balamohan)
a88871e is described below

commit a88871e58ecc4d57629e91454d129d8669c06f41
Author: Mustafa Iman <mustafai...@gmail.com>
AuthorDate: Sun Aug 2 21:25:20 2020 -0700

    HIVE-23975: Reuse evicted keys from aggregation buffers (Mustafa Iman via 
Rajesh Balamohan)
    
    Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>
---
 .../hive/ql/exec/vector/VectorGroupByOperator.java |  26 +++-
 .../wrapper/VectorHashKeyWrapperGeneral.java       | 141 +++++++++++++++++++--
 2 files changed, 158 insertions(+), 9 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index b6cd405..f6b38d6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterF
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase;
 import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch;
+import 
org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperGeneral;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -297,6 +298,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
    */
    final class ProcessingModeHashAggregate extends ProcessingModeBase {
 
+    private Queue<KeyWrapper> reusableKeyWrapperBuffer;
+
     /**
      * The global key-aggregation hash map.
      */
@@ -405,6 +408,10 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
       }
       computeMemoryLimits();
       LOG.debug("using hash aggregation processing mode");
+
+      if (keyWrappersBatch.getVectorHashKeyWrappers()[0] instanceof 
VectorHashKeyWrapperGeneral) {
+        reusableKeyWrapperBuffer = new 
ArrayDeque<>(VectorizedRowBatch.DEFAULT_SIZE);
+      }
     }
 
     @VisibleForTesting
@@ -488,6 +495,9 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
     @Override
     public void close(boolean aborted) throws HiveException {
       reusableAggregationBufferRows.clear();
+      if (reusableKeyWrapperBuffer != null) {
+        reusableKeyWrapperBuffer.clear();
+      }
       if (!aborted) {
         flush(true);
       }
@@ -536,7 +546,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
           // is very important to clone the keywrapper, the one we have from 
our
           // keyWrappersBatch is going to be reset/reused on next batch.
           aggregationBuffer = allocateAggregationBuffer();
-          mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
+          KeyWrapper copyKeyWrapper = cloneKeyWrapper(kw);
+          mapKeysAggregationBuffers.put(copyKeyWrapper, aggregationBuffer);
           numEntriesHashTable++;
           numEntriesSinceCheck++;
         } else {
@@ -548,6 +559,16 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
       }
     }
 
+    private KeyWrapper cloneKeyWrapper(VectorHashKeyWrapperBase from) {
+      if (reusableKeyWrapperBuffer != null && reusableKeyWrapperBuffer.size() 
> 0) {
+        KeyWrapper keyWrapper = reusableKeyWrapperBuffer.poll();
+        from.copyKey(keyWrapper);
+        return keyWrapper;
+      } else {
+        return from.copyKey();
+      }
+    }
+
     /**
      * Computes the memory limits for hash table flush (spill).
      */
@@ -637,6 +658,9 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
           totalAccessCount -= bufferRow.getAccessCount();
           reusableAggregationBufferRows.add(bufferRow);
           bufferRow.resetAccessCount();
+          if (reusableKeyWrapperBuffer != null) {
+            reusableKeyWrapperBuffer.add(pair.getKey());
+          }
           iter.remove();
           --numEntriesHashTable;
           if (++entriesFlushed >= entriesToFlush) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
index c605ce3..929bb0a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.wrapper;
 
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.serde2.io.DateWritableV2;
 import org.apache.hive.common.util.Murmur3;
 
@@ -27,15 +28,12 @@ import java.util.Arrays;
 
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSetInfo;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 
 import com.google.common.base.Preconditions;
@@ -205,11 +203,6 @@ public class VectorHashKeyWrapperGeneral extends 
VectorHashKeyWrapperBase {
   @Override
   protected Object clone() {
     VectorHashKeyWrapperGeneral clone = new VectorHashKeyWrapperGeneral();
-    duplicateTo(clone);
-    return clone;
-  }
-
-  private void duplicateTo(VectorHashKeyWrapperGeneral clone) {
     clone.hashCtx = hashCtx;
     clone.keyCount = keyCount;
     clone.longValues = (longValues.length > 0) ? longValues.clone() : 
EMPTY_LONG_ARRAY;
@@ -262,6 +255,138 @@ public class VectorHashKeyWrapperGeneral extends 
VectorHashKeyWrapperBase {
 
     clone.hashcode = hashcode;
     assert clone.equals(this);
+
+    return clone;
+  }
+
+  private long[] copyInPlaceOrAllocate(long[] from, long[] to) {
+    if (from.length > 0) {
+      if (to != null && to.length == from.length) {
+        System.arraycopy(from, 0, to, 0, from.length);
+        return to;
+      } else {
+        return from.clone();
+      }
+    } else {
+      return EMPTY_LONG_ARRAY;
+    }
+  }
+
+  private double[] copyInPlaceOrAllocate(double[] from, double[] to) {
+    if (from.length > 0) {
+      if (to != null && to.length == from.length) {
+        System.arraycopy(from, 0, to, 0, from.length);
+        return to;
+      } else {
+        return from.clone();
+      }
+    } else {
+      return EMPTY_DOUBLE_ARRAY;
+    }
+  }
+
+  private boolean[] copyInPlaceOrAllocate(boolean[] from, boolean[] to) {
+    if (to != null && to.length == from.length) {
+      System.arraycopy(from, 0, to, 0, from.length);
+      return to;
+    } else {
+      return from.clone();
+    }
+  }
+
+  private HiveDecimalWritable[] copyInPlaceOrAllocate(HiveDecimalWritable[] 
from, HiveDecimalWritable[] to) {
+    if (from.length > 0) {
+      if (to == null || to.length != from.length) {
+        to = new HiveDecimalWritable[from.length];
+      }
+      for (int i = 0; i < from.length; i++) {
+        to[i] = new HiveDecimalWritable(from[i]);
+      }
+      return to;
+    } else {
+      return EMPTY_DECIMAL_ARRAY;
+    }
+  }
+
+  private Timestamp[] copyInPlaceOrAllocate(Timestamp[] from, Timestamp[] to) {
+    if (from.length > 0) {
+      if (to == null || to.length != from.length) {
+        to = new Timestamp[from.length];
+      }
+      for (int i = 0; i < from.length; i++) {
+        to[i] = (Timestamp) from[i].clone();
+      }
+      return to;
+    } else {
+      return EMPTY_TIMESTAMP_ARRAY;
+    }
+  }
+
+  @Override
+  public void copyKey(KeyWrapper oldWrapper) {
+    VectorHashKeyWrapperGeneral clone = (VectorHashKeyWrapperGeneral) 
oldWrapper;
+    clone.hashCtx = hashCtx;
+    clone.keyCount = keyCount;
+    clone.longValues = copyInPlaceOrAllocate(longValues, clone.longValues);
+    clone.doubleValues = copyInPlaceOrAllocate(doubleValues, 
clone.doubleValues);
+    clone.isNull = copyInPlaceOrAllocate(isNull, clone.isNull);
+    clone.decimalValues = copyInPlaceOrAllocate(decimalValues, 
clone.decimalValues);
+
+    if (byteLengths.length > 0) {
+      if (clone.byteLengths == null || clone.byteValues.length != 
byteValues.length) {
+        // byteValues and byteStarts are always the same length
+        clone.byteValues = new byte[byteValues.length][];
+        clone.byteStarts = new int[byteValues.length];
+        clone.byteLengths = byteLengths.clone();
+        for (int i = 0; i < byteValues.length; ++i) {
+          // avoid allocation/copy of nulls, because it potentially expensive.
+          // branch instead.
+          if (byteLengths[i] != -1) {
+            clone.byteValues[i] = Arrays.copyOfRange(byteValues[i],
+                byteStarts[i], byteStarts[i] + byteLengths[i]);
+          }
+        }
+      } else {
+        System.arraycopy(byteLengths, 0, clone.byteLengths, 0, 
byteValues.length);
+        Arrays.fill(byteStarts, 0);
+        System.arraycopy(byteStarts, 0, clone.byteStarts, 0, 
byteValues.length);
+        for (int i = 0; i < byteValues.length; ++i) {
+          // avoid allocation/copy of nulls, because it potentially expensive.
+          // branch instead.
+          if (byteLengths[i] != -1) {
+            if (clone.byteValues[i] != null && clone.byteValues[i].length >= 
byteValues[i].length) {
+              System.arraycopy(byteValues[i], byteStarts[i], 
clone.byteValues[i], 0, byteLengths[i]);
+            } else {
+              clone.byteValues[i] = Arrays.copyOfRange(byteValues[i],
+                  byteStarts[i], byteStarts[i] + byteLengths[i]);
+            }
+          }
+        }
+      }
+    } else {
+      clone.byteValues = EMPTY_BYTES_ARRAY;
+      clone.byteStarts = EMPTY_INT_ARRAY;
+      clone.byteLengths = EMPTY_INT_ARRAY;
+    }
+    clone.timestampValues = copyInPlaceOrAllocate(timestampValues, 
clone.timestampValues);
+    clone.intervalDayTimeValues = copyInPlaceOrAllocate(intervalDayTimeValues, 
clone.intervalDayTimeValues);
+
+    clone.hashcode = hashcode;
+    assert clone.equals(this);
+  }
+
+  private HiveIntervalDayTime[] copyInPlaceOrAllocate(HiveIntervalDayTime[] 
from, HiveIntervalDayTime[] to) {
+    if (from.length > 0) {
+      if (to == null || to.length != from.length) {
+        to = new HiveIntervalDayTime[from.length];
+      }
+      for (int i = 0; i < from.length; i++) {
+        to[i] = (HiveIntervalDayTime) from[i].clone();
+      }
+      return to;
+    } else {
+      return EMPTY_INTERVAL_DAY_TIME_ARRAY;
+    }
   }
 
   @Override

Reply via email to