This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a88871e HIVE-23975: Reuse evicted keys from aggregation buffers (Mustafa Iman via Rajesh Balamohan) a88871e is described below commit a88871e58ecc4d57629e91454d129d8669c06f41 Author: Mustafa Iman <mustafai...@gmail.com> AuthorDate: Sun Aug 2 21:25:20 2020 -0700 HIVE-23975: Reuse evicted keys from aggregation buffers (Mustafa Iman via Rajesh Balamohan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../hive/ql/exec/vector/VectorGroupByOperator.java | 26 +++- .../wrapper/VectorHashKeyWrapperGeneral.java | 141 +++++++++++++++++++-- 2 files changed, 158 insertions(+), 9 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index b6cd405..f6b38d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterF import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBase; import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperBatch; +import org.apache.hadoop.hive.ql.exec.vector.wrapper.VectorHashKeyWrapperGeneral; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; @@ -297,6 +298,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> */ final class ProcessingModeHashAggregate extends ProcessingModeBase { + private Queue<KeyWrapper> reusableKeyWrapperBuffer; + /** * The global key-aggregation hash map. */ @@ -405,6 +408,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> } computeMemoryLimits(); LOG.debug("using hash aggregation processing mode"); + + if (keyWrappersBatch.getVectorHashKeyWrappers()[0] instanceof VectorHashKeyWrapperGeneral) { + reusableKeyWrapperBuffer = new ArrayDeque<>(VectorizedRowBatch.DEFAULT_SIZE); + } } @VisibleForTesting @@ -488,6 +495,9 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> @Override public void close(boolean aborted) throws HiveException { reusableAggregationBufferRows.clear(); + if (reusableKeyWrapperBuffer != null) { + reusableKeyWrapperBuffer.clear(); + } if (!aborted) { flush(true); } @@ -536,7 +546,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> // is very important to clone the keywrapper, the one we have from our // keyWrappersBatch is going to be reset/reused on next batch. aggregationBuffer = allocateAggregationBuffer(); - mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); + KeyWrapper copyKeyWrapper = cloneKeyWrapper(kw); + mapKeysAggregationBuffers.put(copyKeyWrapper, aggregationBuffer); numEntriesHashTable++; numEntriesSinceCheck++; } else { @@ -548,6 +559,16 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> } } + private KeyWrapper cloneKeyWrapper(VectorHashKeyWrapperBase from) { + if (reusableKeyWrapperBuffer != null && reusableKeyWrapperBuffer.size() > 0) { + KeyWrapper keyWrapper = reusableKeyWrapperBuffer.poll(); + from.copyKey(keyWrapper); + return keyWrapper; + } else { + return from.copyKey(); + } + } + /** * Computes the memory limits for hash table flush (spill). */ @@ -637,6 +658,9 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> totalAccessCount -= bufferRow.getAccessCount(); reusableAggregationBufferRows.add(bufferRow); bufferRow.resetAccessCount(); + if (reusableKeyWrapperBuffer != null) { + reusableKeyWrapperBuffer.add(pair.getKey()); + } iter.remove(); --numEntriesHashTable; if (++entriesFlushed >= entriesToFlush) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java index c605ce3..929bb0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/wrapper/VectorHashKeyWrapperGeneral.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector.wrapper; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hive.common.util.Murmur3; @@ -27,15 +28,12 @@ import java.util.Arrays; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; -import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSetInfo; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import com.google.common.base.Preconditions; @@ -205,11 +203,6 @@ public class VectorHashKeyWrapperGeneral extends VectorHashKeyWrapperBase { @Override protected Object clone() { VectorHashKeyWrapperGeneral clone = new VectorHashKeyWrapperGeneral(); - duplicateTo(clone); - return clone; - } - - private void duplicateTo(VectorHashKeyWrapperGeneral clone) { clone.hashCtx = hashCtx; clone.keyCount = keyCount; clone.longValues = (longValues.length > 0) ? longValues.clone() : EMPTY_LONG_ARRAY; @@ -262,6 +255,138 @@ public class VectorHashKeyWrapperGeneral extends VectorHashKeyWrapperBase { clone.hashcode = hashcode; assert clone.equals(this); + + return clone; + } + + private long[] copyInPlaceOrAllocate(long[] from, long[] to) { + if (from.length > 0) { + if (to != null && to.length == from.length) { + System.arraycopy(from, 0, to, 0, from.length); + return to; + } else { + return from.clone(); + } + } else { + return EMPTY_LONG_ARRAY; + } + } + + private double[] copyInPlaceOrAllocate(double[] from, double[] to) { + if (from.length > 0) { + if (to != null && to.length == from.length) { + System.arraycopy(from, 0, to, 0, from.length); + return to; + } else { + return from.clone(); + } + } else { + return EMPTY_DOUBLE_ARRAY; + } + } + + private boolean[] copyInPlaceOrAllocate(boolean[] from, boolean[] to) { + if (to != null && to.length == from.length) { + System.arraycopy(from, 0, to, 0, from.length); + return to; + } else { + return from.clone(); + } + } + + private HiveDecimalWritable[] copyInPlaceOrAllocate(HiveDecimalWritable[] from, HiveDecimalWritable[] to) { + if (from.length > 0) { + if (to == null || to.length != from.length) { + to = new HiveDecimalWritable[from.length]; + } + for (int i = 0; i < from.length; i++) { + to[i] = new HiveDecimalWritable(from[i]); + } + return to; + } else { + return EMPTY_DECIMAL_ARRAY; + } + } + + private Timestamp[] copyInPlaceOrAllocate(Timestamp[] from, Timestamp[] to) { + if (from.length > 0) { + if (to == null || to.length != from.length) { + to = new Timestamp[from.length]; + } + for (int i = 0; i < from.length; i++) { + to[i] = (Timestamp) from[i].clone(); + } + return to; + } else { + return EMPTY_TIMESTAMP_ARRAY; + } + } + + @Override + public void copyKey(KeyWrapper oldWrapper) { + VectorHashKeyWrapperGeneral clone = (VectorHashKeyWrapperGeneral) oldWrapper; + clone.hashCtx = hashCtx; + clone.keyCount = keyCount; + clone.longValues = copyInPlaceOrAllocate(longValues, clone.longValues); + clone.doubleValues = copyInPlaceOrAllocate(doubleValues, clone.doubleValues); + clone.isNull = copyInPlaceOrAllocate(isNull, clone.isNull); + clone.decimalValues = copyInPlaceOrAllocate(decimalValues, clone.decimalValues); + + if (byteLengths.length > 0) { + if (clone.byteLengths == null || clone.byteValues.length != byteValues.length) { + // byteValues and byteStarts are always the same length + clone.byteValues = new byte[byteValues.length][]; + clone.byteStarts = new int[byteValues.length]; + clone.byteLengths = byteLengths.clone(); + for (int i = 0; i < byteValues.length; ++i) { + // avoid allocation/copy of nulls, because it potentially expensive. + // branch instead. + if (byteLengths[i] != -1) { + clone.byteValues[i] = Arrays.copyOfRange(byteValues[i], + byteStarts[i], byteStarts[i] + byteLengths[i]); + } + } + } else { + System.arraycopy(byteLengths, 0, clone.byteLengths, 0, byteValues.length); + Arrays.fill(byteStarts, 0); + System.arraycopy(byteStarts, 0, clone.byteStarts, 0, byteValues.length); + for (int i = 0; i < byteValues.length; ++i) { + // avoid allocation/copy of nulls, because it potentially expensive. + // branch instead. + if (byteLengths[i] != -1) { + if (clone.byteValues[i] != null && clone.byteValues[i].length >= byteValues[i].length) { + System.arraycopy(byteValues[i], byteStarts[i], clone.byteValues[i], 0, byteLengths[i]); + } else { + clone.byteValues[i] = Arrays.copyOfRange(byteValues[i], + byteStarts[i], byteStarts[i] + byteLengths[i]); + } + } + } + } + } else { + clone.byteValues = EMPTY_BYTES_ARRAY; + clone.byteStarts = EMPTY_INT_ARRAY; + clone.byteLengths = EMPTY_INT_ARRAY; + } + clone.timestampValues = copyInPlaceOrAllocate(timestampValues, clone.timestampValues); + clone.intervalDayTimeValues = copyInPlaceOrAllocate(intervalDayTimeValues, clone.intervalDayTimeValues); + + clone.hashcode = hashcode; + assert clone.equals(this); + } + + private HiveIntervalDayTime[] copyInPlaceOrAllocate(HiveIntervalDayTime[] from, HiveIntervalDayTime[] to) { + if (from.length > 0) { + if (to == null || to.length != from.length) { + to = new HiveIntervalDayTime[from.length]; + } + for (int i = 0; i < from.length; i++) { + to[i] = (HiveIntervalDayTime) from[i].clone(); + } + return to; + } else { + return EMPTY_INTERVAL_DAY_TIME_ARRAY; + } } @Override