This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a88871e HIVE-23975: Reuse evicted keys from aggregation buffers
(Mustafa Iman via Rajesh Balamohan)
a88871e is described below
commit a88871e58ecc4d57629e91454d129d8669c06f41
Author: Mustafa Iman <[email protected]>
AuthorDate: Sun Aug 2 21:25:20 2020 -0700
HIVE-23975: Reuse evicted keys from aggregation buffers (Mustafa Iman via
Rajesh Balamohan)
Signed-off-by: Ashutosh Chauhan <[email protected]>
---
.../hive/ql/exec/vector/VectorGroupByOperator.java | 26 +++-
.../wrapper/VectorHashKeyWrapperGeneral.java | 141 +++++++++++++++++++--
2 files changed, 158 insertions(+), 9 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index b6cd405..f6b38d6 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -50,6 +50,7 @@ import
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterF
import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase;
import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch;
+import
org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperGeneral;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -297,6 +298,8 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
*/
final class ProcessingModeHashAggregate extends ProcessingModeBase {
+ private Queue<KeyWrapper> reusableKeyWrapperBuffer;
+
/**
* The global key-aggregation hash map.
*/
@@ -405,6 +408,10 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
}
computeMemoryLimits();
LOG.debug("using hash aggregation processing mode");
+
+ if (keyWrappersBatch.getVectorHashKeyWrappers()[0] instanceof
VectorHashKeyWrapperGeneral) {
+ reusableKeyWrapperBuffer = new
ArrayDeque<>(VectorizedRowBatch.DEFAULT_SIZE);
+ }
}
@VisibleForTesting
@@ -488,6 +495,9 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
@Override
public void close(boolean aborted) throws HiveException {
reusableAggregationBufferRows.clear();
+ if (reusableKeyWrapperBuffer != null) {
+ reusableKeyWrapperBuffer.clear();
+ }
if (!aborted) {
flush(true);
}
@@ -536,7 +546,8 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
// is very important to clone the keywrapper, the one we have from
our
// keyWrappersBatch is going to be reset/reused on next batch.
aggregationBuffer = allocateAggregationBuffer();
- mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
+ KeyWrapper copyKeyWrapper = cloneKeyWrapper(kw);
+ mapKeysAggregationBuffers.put(copyKeyWrapper, aggregationBuffer);
numEntriesHashTable++;
numEntriesSinceCheck++;
} else {
@@ -548,6 +559,16 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
}
}
+ private KeyWrapper cloneKeyWrapper(VectorHashKeyWrapperBase from) {
+ if (reusableKeyWrapperBuffer != null && reusableKeyWrapperBuffer.size()
> 0) {
+ KeyWrapper keyWrapper = reusableKeyWrapperBuffer.poll();
+ from.copyKey(keyWrapper);
+ return keyWrapper;
+ } else {
+ return from.copyKey();
+ }
+ }
+
/**
* Computes the memory limits for hash table flush (spill).
*/
@@ -637,6 +658,9 @@ public class VectorGroupByOperator extends
Operator<GroupByDesc>
totalAccessCount -= bufferRow.getAccessCount();
reusableAggregationBufferRows.add(bufferRow);
bufferRow.resetAccessCount();
+ if (reusableKeyWrapperBuffer != null) {
+ reusableKeyWrapperBuffer.add(pair.getKey());
+ }
iter.remove();
--numEntriesHashTable;
if (++entriesFlushed >= entriesToFlush) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
index c605ce3..929bb0a 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.wrapper;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hive.common.util.Murmur3;
@@ -27,15 +28,12 @@ import java.util.Arrays;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSetInfo;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import com.google.common.base.Preconditions;
@@ -205,11 +203,6 @@ public class VectorHashKeyWrapperGeneral extends
VectorHashKeyWrapperBase {
@Override
protected Object clone() {
VectorHashKeyWrapperGeneral clone = new VectorHashKeyWrapperGeneral();
- duplicateTo(clone);
- return clone;
- }
-
- private void duplicateTo(VectorHashKeyWrapperGeneral clone) {
clone.hashCtx = hashCtx;
clone.keyCount = keyCount;
clone.longValues = (longValues.length > 0) ? longValues.clone() :
EMPTY_LONG_ARRAY;
@@ -262,6 +255,138 @@ public class VectorHashKeyWrapperGeneral extends
VectorHashKeyWrapperBase {
clone.hashcode = hashcode;
assert clone.equals(this);
+
+ return clone;
+ }
+
+ private long[] copyInPlaceOrAllocate(long[] from, long[] to) {
+ if (from.length > 0) {
+ if (to != null && to.length == from.length) {
+ System.arraycopy(from, 0, to, 0, from.length);
+ return to;
+ } else {
+ return from.clone();
+ }
+ } else {
+ return EMPTY_LONG_ARRAY;
+ }
+ }
+
+ private double[] copyInPlaceOrAllocate(double[] from, double[] to) {
+ if (from.length > 0) {
+ if (to != null && to.length == from.length) {
+ System.arraycopy(from, 0, to, 0, from.length);
+ return to;
+ } else {
+ return from.clone();
+ }
+ } else {
+ return EMPTY_DOUBLE_ARRAY;
+ }
+ }
+
+ private boolean[] copyInPlaceOrAllocate(boolean[] from, boolean[] to) {
+ if (to != null && to.length == from.length) {
+ System.arraycopy(from, 0, to, 0, from.length);
+ return to;
+ } else {
+ return from.clone();
+ }
+ }
+
+ private HiveDecimalWritable[] copyInPlaceOrAllocate(HiveDecimalWritable[]
from, HiveDecimalWritable[] to) {
+ if (from.length > 0) {
+ if (to == null || to.length != from.length) {
+ to = new HiveDecimalWritable[from.length];
+ }
+ for (int i = 0; i < from.length; i++) {
+ to[i] = new HiveDecimalWritable(from[i]);
+ }
+ return to;
+ } else {
+ return EMPTY_DECIMAL_ARRAY;
+ }
+ }
+
+ private Timestamp[] copyInPlaceOrAllocate(Timestamp[] from, Timestamp[] to) {
+ if (from.length > 0) {
+ if (to == null || to.length != from.length) {
+ to = new Timestamp[from.length];
+ }
+ for (int i = 0; i < from.length; i++) {
+ to[i] = (Timestamp) from[i].clone();
+ }
+ return to;
+ } else {
+ return EMPTY_TIMESTAMP_ARRAY;
+ }
+ }
+
+ @Override
+ public void copyKey(KeyWrapper oldWrapper) {
+ VectorHashKeyWrapperGeneral clone = (VectorHashKeyWrapperGeneral)
oldWrapper;
+ clone.hashCtx = hashCtx;
+ clone.keyCount = keyCount;
+ clone.longValues = copyInPlaceOrAllocate(longValues, clone.longValues);
+ clone.doubleValues = copyInPlaceOrAllocate(doubleValues,
clone.doubleValues);
+ clone.isNull = copyInPlaceOrAllocate(isNull, clone.isNull);
+ clone.decimalValues = copyInPlaceOrAllocate(decimalValues,
clone.decimalValues);
+
+ if (byteLengths.length > 0) {
+ if (clone.byteLengths == null || clone.byteValues.length !=
byteValues.length) {
+ // byteValues and byteStarts are always the same length
+ clone.byteValues = new byte[byteValues.length][];
+ clone.byteStarts = new int[byteValues.length];
+ clone.byteLengths = byteLengths.clone();
+ for (int i = 0; i < byteValues.length; ++i) {
+ // avoid allocation/copy of nulls, because it potentially expensive.
+ // branch instead.
+ if (byteLengths[i] != -1) {
+ clone.byteValues[i] = Arrays.copyOfRange(byteValues[i],
+ byteStarts[i], byteStarts[i] + byteLengths[i]);
+ }
+ }
+ } else {
+ System.arraycopy(byteLengths, 0, clone.byteLengths, 0,
byteValues.length);
+ Arrays.fill(byteStarts, 0);
+ System.arraycopy(byteStarts, 0, clone.byteStarts, 0,
byteValues.length);
+ for (int i = 0; i < byteValues.length; ++i) {
+ // avoid allocation/copy of nulls, because it potentially expensive.
+ // branch instead.
+ if (byteLengths[i] != -1) {
+ if (clone.byteValues[i] != null && clone.byteValues[i].length >=
byteValues[i].length) {
+ System.arraycopy(byteValues[i], byteStarts[i],
clone.byteValues[i], 0, byteLengths[i]);
+ } else {
+ clone.byteValues[i] = Arrays.copyOfRange(byteValues[i],
+ byteStarts[i], byteStarts[i] + byteLengths[i]);
+ }
+ }
+ }
+ }
+ } else {
+ clone.byteValues = EMPTY_BYTES_ARRAY;
+ clone.byteStarts = EMPTY_INT_ARRAY;
+ clone.byteLengths = EMPTY_INT_ARRAY;
+ }
+ clone.timestampValues = copyInPlaceOrAllocate(timestampValues,
clone.timestampValues);
+ clone.intervalDayTimeValues = copyInPlaceOrAllocate(intervalDayTimeValues,
clone.intervalDayTimeValues);
+
+ clone.hashcode = hashcode;
+ assert clone.equals(this);
+ }
+
+ private HiveIntervalDayTime[] copyInPlaceOrAllocate(HiveIntervalDayTime[]
from, HiveIntervalDayTime[] to) {
+ if (from.length > 0) {
+ if (to == null || to.length != from.length) {
+ to = new HiveIntervalDayTime[from.length];
+ }
+ for (int i = 0; i < from.length; i++) {
+ to[i] = (HiveIntervalDayTime) from[i].clone();
+ }
+ return to;
+ } else {
+ return EMPTY_INTERVAL_DAY_TIME_ARRAY;
+ }
}
@Override