This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ad68c0  HIVE-23952: Reuse VectorAggregationBuffer to reduce GC 
pressure in VectorGroupByOperator ( Mustafa Iman via Rajesh Balamohan)
1ad68c0 is described below

commit 1ad68c0cd2e039ae15fa222955d078d6bd3580b1
Author: Mustafa Iman <mustafai...@gmail.com>
AuthorDate: Wed Jul 29 21:49:42 2020 -0700

    HIVE-23952: Reuse VectorAggregationBuffer to reduce GC pressure in 
VectorGroupByOperator ( Mustafa Iman via Rajesh Balamohan)
    
    Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>
---
 .../hive/ql/exec/vector/VectorGroupByOperator.java | 46 ++++++++++---
 .../ql/exec/vector/TestVectorGroupByOperator.java  | 75 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 10 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 02864d9..b6cd405 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -22,12 +22,14 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.ref.SoftReference;
 import java.lang.reflect.Constructor;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -107,7 +109,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
   // transient.
   //---------------------------------------------------------------------------
 
-  private transient VectorAggregateExpression[] aggregators;
+  @VisibleForTesting
+  transient VectorAggregateExpression[] aggregators;
   /**
    * The aggregation buffers to use for the current batch.
    */
@@ -159,10 +162,10 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
    * Interface for processing mode: global, hash, unsorted streaming, or group 
batch
    */
   private static interface IProcessingMode {
-    public void initialize(Configuration hconf) throws HiveException;
-    public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException;
-    public void processBatch(VectorizedRowBatch batch) throws HiveException;
-    public void close(boolean aborted) throws HiveException;
+    void initialize(Configuration hconf) throws HiveException;
+    void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws 
HiveException;
+    void processBatch(VectorizedRowBatch batch) throws HiveException;
+    void close(boolean aborted) throws HiveException;
   }
 
   /**
@@ -300,6 +303,9 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
     @VisibleForTesting
     Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
 
+    private Queue<VectorAggregationBufferRow> reusableAggregationBufferRows =
+        new ArrayDeque<>(VectorizedRowBatch.DEFAULT_SIZE);
+
     /**
      * Total per hashtable entry fixed memory (does not depend on key/agg 
values).
      */
@@ -465,7 +471,23 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
     }
 
     @Override
+    protected VectorAggregationBufferRow allocateAggregationBuffer() throws 
HiveException {
+      VectorAggregationBufferRow bufferSet;
+      if (reusableAggregationBufferRows.size() > 0) {
+        bufferSet = reusableAggregationBufferRows.remove();
+        bufferSet.setVersionAndIndex(0, 0);
+        for (int i = 0; i < aggregators.length; i++) {
+          aggregators[i].reset(bufferSet.getAggregationBuffer(i));
+        }
+        return bufferSet;
+      } else {
+        return super.allocateAggregationBuffer();
+      }
+    }
+
+    @Override
     public void close(boolean aborted) throws HiveException {
+      reusableAggregationBufferRows.clear();
       if (!aborted) {
         flush(true);
       }
@@ -598,19 +620,23 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
           mapKeysAggregationBuffers.entrySet().iterator();
       while(iter.hasNext()) {
         Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
+        KeyWrapper keyWrapper = pair.getKey();
+        VectorAggregationBufferRow bufferRow = pair.getValue();
         if (!all && avgAccess >= 1) {
-          if (pair.getValue().getAccessCount() > avgAccess) {
+          if (bufferRow.getAccessCount() > avgAccess) {
             // resetting to give chance for other entries
-            totalAccessCount -= pair.getValue().getAccessCount();
-            pair.getValue().resetAccessCount();
+            totalAccessCount -= bufferRow.getAccessCount();
+            bufferRow.resetAccessCount();
             continue;
           }
         }
 
-        writeSingleRow((VectorHashKeyWrapperBase) pair.getKey(), 
pair.getValue());
+        writeSingleRow((VectorHashKeyWrapperBase) keyWrapper, bufferRow);
 
         if (!all) {
-          totalAccessCount -= pair.getValue().getAccessCount();
+          totalAccessCount -= bufferRow.getAccessCount();
+          reusableAggregationBufferRows.add(bufferRow);
+          bufferRow.resetAccessCount();
           iter.remove();
           --numEntriesHashTable;
           if (++entriesFlushed >= entriesToFlush) {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index d6a8548..b229292 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.hive.ql.exec.vector;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
 import 
org.apache.hadoop.hive.ql.exec.vector.util.FakeCaptureVectorToRowOutputOperator;
 import org.apache.hadoop.hive.ql.exec.vector.util.FakeVectorRowBatchFromConcat;
@@ -84,6 +88,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Unit test for the vectorized GROUP BY operator.
@@ -610,6 +615,76 @@ public class TestVectorGroupByOperator {
   }
 
   @Test
+  public void testRollupAggregationWithBufferReuse() throws HiveException {
+    List<String> mapColumnNames = new ArrayList<String>();
+    mapColumnNames.add("k1");
+    mapColumnNames.add("k2");
+    mapColumnNames.add("v");
+    VectorizationContext ctx = new VectorizationContext("name", 
mapColumnNames);
+
+    // select count(v) from name group by rollup (k1,k2);
+
+    Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, 
"count",
+        "v", TypeInfoFactory.longTypeInfo,
+        new String[] { "k1", "k2" },
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo, 
TypeInfoFactory.longTypeInfo});
+    GroupByDesc desc = pair.left;
+    VectorGroupByDesc vectorDesc = pair.right;
+
+    desc.setGroupingSetsPresent(true);
+    ArrayList<Long> groupingSets = new ArrayList<>();
+    // groupingSets
+    groupingSets.add(0L);
+    groupingSets.add(1L);
+    groupingSets.add(2L);
+    desc.setListGroupingSets(groupingSets);
+    // add grouping sets dummy key
+    ExprNodeDesc groupingSetDummyKey = new 
ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+
+    desc.getKeys().add(groupingSetDummyKey);
+    // groupingSet Position
+    desc.setGroupingSetPosition(2);
+
+    CompilationOpContext cCtx = new CompilationOpContext();
+
+    desc.setMinReductionHashAggr(0.5f);
+
+    Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, 
desc);
+
+    VectorGroupByOperator vgo =
+        (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, 
ctx, vectorDesc);
+
+    FakeCaptureVectorToRowOutputOperator out = 
FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+    vgo.initialize(hconf, null);
+
+    //Get the processing mode
+    VectorGroupByOperator.ProcessingModeHashAggregate processingMode =
+        (VectorGroupByOperator.ProcessingModeHashAggregate) vgo.processingMode;
+    VectorAggregateExpression spyAggregator = spy(vgo.aggregators[0]);
+    vgo.aggregators[0] = spyAggregator;
+
+    FakeVectorRowBatchFromObjectIterables data = getDataForRollup();
+
+    long countRowsProduced = 0;
+    for (VectorizedRowBatch unit: data) {
+      countRowsProduced += unit.size;
+      vgo.process(unit,  0);
+
+      // trigger flush frequently to simulate operator working on many batches
+      processingMode.gcCanary.clear();
+
+      if (countRowsProduced >= 1000) {
+        break;
+      }
+    }
+
+    vgo.close(false);
+    // The exact number of allocations depend on input. In this case it is 13.
+    // Without buffer reuse, we allocate 512 buffers for the same input
+    verify(spyAggregator, times(13)).getNewAggregationBuffer();
+  }
+
+  @Test
   public void testRollupAggregationWithFlush() throws HiveException {
 
     List<String> mapColumnNames = new ArrayList<String>();

Reply via email to