This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 1ad68c0 HIVE-23952: Reuse VectorAggregationBuffer to reduce GC pressure in VectorGroupByOperator ( Mustafa Iman via Rajesh Balamohan) 1ad68c0 is described below commit 1ad68c0cd2e039ae15fa222955d078d6bd3580b1 Author: Mustafa Iman <mustafai...@gmail.com> AuthorDate: Wed Jul 29 21:49:42 2020 -0700 HIVE-23952: Reuse VectorAggregationBuffer to reduce GC pressure in VectorGroupByOperator ( Mustafa Iman via Rajesh Balamohan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../hive/ql/exec/vector/VectorGroupByOperator.java | 46 ++++++++++--- .../ql/exec/vector/TestVectorGroupByOperator.java | 75 ++++++++++++++++++++++ 2 files changed, 111 insertions(+), 10 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 02864d9..b6cd405 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -22,12 +22,14 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.lang.ref.SoftReference; import java.lang.reflect.Constructor; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; @@ -107,7 +109,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> // transient. //--------------------------------------------------------------------------- - private transient VectorAggregateExpression[] aggregators; + @VisibleForTesting + transient VectorAggregateExpression[] aggregators; /** * The aggregation buffers to use for the current batch. */ @@ -159,10 +162,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> * Interface for processing mode: global, hash, unsorted streaming, or group batch */ private static interface IProcessingMode { - public void initialize(Configuration hconf) throws HiveException; - public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException; - public void processBatch(VectorizedRowBatch batch) throws HiveException; - public void close(boolean aborted) throws HiveException; + void initialize(Configuration hconf) throws HiveException; + void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException; + void processBatch(VectorizedRowBatch batch) throws HiveException; + void close(boolean aborted) throws HiveException; } /** @@ -300,6 +303,9 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> @VisibleForTesting Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers; + private Queue<VectorAggregationBufferRow> reusableAggregationBufferRows = + new ArrayDeque<>(VectorizedRowBatch.DEFAULT_SIZE); + /** * Total per hashtable entry fixed memory (does not depend on key/agg values). */ @@ -465,7 +471,23 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> } @Override + protected VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException { + VectorAggregationBufferRow bufferSet; + if (reusableAggregationBufferRows.size() > 0) { + bufferSet = reusableAggregationBufferRows.remove(); + bufferSet.setVersionAndIndex(0, 0); + for (int i = 0; i < aggregators.length; i++) { + aggregators[i].reset(bufferSet.getAggregationBuffer(i)); + } + return bufferSet; + } else { + return super.allocateAggregationBuffer(); + } + } + + @Override public void close(boolean aborted) throws HiveException { + reusableAggregationBufferRows.clear(); if (!aborted) { flush(true); } @@ -598,19 +620,23 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> mapKeysAggregationBuffers.entrySet().iterator(); while(iter.hasNext()) { Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next(); + KeyWrapper keyWrapper = pair.getKey(); + VectorAggregationBufferRow bufferRow = pair.getValue(); if (!all && avgAccess >= 1) { - if (pair.getValue().getAccessCount() > avgAccess) { + if (bufferRow.getAccessCount() > avgAccess) { // resetting to give chance for other entries - totalAccessCount -= pair.getValue().getAccessCount(); - pair.getValue().resetAccessCount(); + totalAccessCount -= bufferRow.getAccessCount(); + bufferRow.resetAccessCount(); continue; } } - writeSingleRow((VectorHashKeyWrapperBase) pair.getKey(), pair.getValue()); + writeSingleRow((VectorHashKeyWrapperBase) keyWrapper, bufferRow); if (!all) { - totalAccessCount -= pair.getValue().getAccessCount(); + totalAccessCount -= bufferRow.getAccessCount(); + reusableAggregationBufferRows.add(bufferRow); + bufferRow.resetAccessCount(); iter.remove(); --numEntriesHashTable; if (++entriesFlushed >= entriesToFlush) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index d6a8548..b229292 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -21,6 +21,9 @@ package org.apache.hadoop.hive.ql.exec.vector; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; @@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar; import org.apache.hadoop.hive.ql.exec.vector.util.FakeCaptureVectorToRowOutputOperator; import org.apache.hadoop.hive.ql.exec.vector.util.FakeVectorRowBatchFromConcat; @@ -84,6 +88,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; /** * Unit test for the vectorized GROUP BY operator. @@ -610,6 +615,76 @@ public class TestVectorGroupByOperator { } @Test + public void testRollupAggregationWithBufferReuse() throws HiveException { + List<String> mapColumnNames = new ArrayList<String>(); + mapColumnNames.add("k1"); + mapColumnNames.add("k2"); + mapColumnNames.add("v"); + VectorizationContext ctx = new VectorizationContext("name", mapColumnNames); + + // select count(v) from name group by rollup (k1,k2); + + Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "count", + "v", TypeInfoFactory.longTypeInfo, + new String[] { "k1", "k2" }, + new TypeInfo[] {TypeInfoFactory.longTypeInfo, TypeInfoFactory.longTypeInfo}); + GroupByDesc desc = pair.left; + VectorGroupByDesc vectorDesc = pair.right; + + desc.setGroupingSetsPresent(true); + ArrayList<Long> groupingSets = new ArrayList<>(); + // groupingSets + groupingSets.add(0L); + groupingSets.add(1L); + groupingSets.add(2L); + desc.setListGroupingSets(groupingSets); + // add grouping sets dummy key + ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L); + + desc.getKeys().add(groupingSetDummyKey); + // groupingSet Position + desc.setGroupingSetPosition(2); + + CompilationOpContext cCtx = new CompilationOpContext(); + + desc.setMinReductionHashAggr(0.5f); + + Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc); + + VectorGroupByOperator vgo = + (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc); + + FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); + vgo.initialize(hconf, null); + + //Get the processing mode + VectorGroupByOperator.ProcessingModeHashAggregate processingMode = + (VectorGroupByOperator.ProcessingModeHashAggregate) vgo.processingMode; + VectorAggregateExpression spyAggregator = spy(vgo.aggregators[0]); + vgo.aggregators[0] = spyAggregator; + + FakeVectorRowBatchFromObjectIterables data = getDataForRollup(); + + long countRowsProduced = 0; + for (VectorizedRowBatch unit: data) { + countRowsProduced += unit.size; + vgo.process(unit, 0); + + // trigger flush frequently to simulate operator working on many batches + processingMode.gcCanary.clear(); + + if (countRowsProduced >= 1000) { + break; + } + } + + vgo.close(false); + // The exact number of allocations depend on input. In this case it is 13. + // Without buffer reuse, we allocate 512 buffers for the same input + verify(spyAggregator, times(13)).getNewAggregationBuffer(); + } + + @Test public void testRollupAggregationWithFlush() throws HiveException { List<String> mapColumnNames = new ArrayList<String>();