This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 380be9afd1a364fe0ff83e61e17ba4ced12f29a0
Author: Rajesh Balamohan <rbalamo...@apache.org>
AuthorDate: Tue Jul 14 10:00:14 2020 +0530

    HIVE-23843: Improve key evictions in VectorGroupByOperator (Rajesh 
Balamohan via Ashutosh Chauhan, Zoltan Haindrich)
    
    Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>
---
 .../ql/exec/vector/VectorAggregationBufferRow.java |  12 +-
 .../hive/ql/exec/vector/VectorGroupByOperator.java |  53 +++++++--
 .../ql/exec/vector/TestVectorGroupByOperator.java  | 125 ++++++++++++++++++---
 3 files changed, 168 insertions(+), 22 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
index 494db35..a7ef154 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
@@ -28,7 +28,8 @@ public class VectorAggregationBufferRow {
   private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers;
   private int version;
   private int index;
-  
+  private int accessed = 0;
+
   public VectorAggregationBufferRow(
       VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) {
     this.aggregationBuffers = aggregationBuffers;
@@ -80,5 +81,12 @@ public class VectorAggregationBufferRow {
       aggregationBuffers[i].reset();
     }
   }
-  
+
+  public int getAccessCount() {
+    return accessed;
+  }
+
+  public void incrementAccessCount() {
+    accessed++;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 9f81e8e..85535f5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -151,6 +151,10 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
   private float memoryThreshold;
 
   private boolean isLlap = false;
+
+  // tracks overall access count in map agg buffer any given time.
+  private long totalAccessCount;
+
   /**
    * Interface for processing mode: global, hash, unsorted streaming, or group 
batch
    */
@@ -251,7 +255,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
    * This mode is very simple, there are no keys to consider, and only flushes 
one row at closing
    * The one row must flush even if no input was seen (NULLs)
    */
-  private class ProcessingModeGlobalAggregate extends ProcessingModeBase {
+  final class ProcessingModeGlobalAggregate extends ProcessingModeBase {
 
     /**
      * In global processing mode there is only one set of aggregation buffers
@@ -288,12 +292,13 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
   /**
    * Hash Aggregate mode processing
    */
-  private class ProcessingModeHashAggregate extends ProcessingModeBase {
+   final class ProcessingModeHashAggregate extends ProcessingModeBase {
 
     /**
      * The global key-aggregation hash map.
      */
-    private Map<KeyWrapper, VectorAggregationBufferRow> 
mapKeysAggregationBuffers;
+    @VisibleForTesting
+    Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
 
     /**
      * Total per hashtable entry fixed memory (does not depend on key/agg 
values).
@@ -334,7 +339,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
     /**
      * A soft reference used to detect memory pressure
      */
-    private SoftReference<Object> gcCanary = new SoftReference<Object>(new 
Object());
+    @VisibleForTesting
+    SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
 
     /**
      * Counts the number of time the gcCanary died and was resurrected
@@ -387,10 +393,19 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
       sumBatchSize = 0;
 
       mapKeysAggregationBuffers = new HashMap<KeyWrapper, 
VectorAggregationBufferRow>();
+      if (groupingSets != null && groupingSets.length > 0) {
+        this.maxHtEntries = this.maxHtEntries / groupingSets.length;
+        LOG.info("New maxHtEntries: {}, groupingSets len: {}", maxHtEntries, 
groupingSets.length);
+      }
       computeMemoryLimits();
       LOG.debug("using hash aggregation processing mode");
     }
 
+    @VisibleForTesting
+    int getMaxHtEntries() {
+      return maxHtEntries;
+    }
+
     @Override
     public void doProcessBatch(VectorizedRowBatch batch, boolean 
isFirstGroupingSet,
         boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
@@ -502,6 +517,10 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
           mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
           numEntriesHashTable++;
           numEntriesSinceCheck++;
+        } else {
+          // for access tracking
+          aggregationBuffer.incrementAccessCount();
+          totalAccessCount++;
         }
         aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
       }
@@ -540,6 +559,16 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
       }
     }
 
+    int computeAvgAccess() {
+      if (numEntriesHashTable == 0) {
+        return 0;
+      }
+      int avgAccess = (int) (totalAccessCount / numEntriesHashTable);
+      LOG.debug("totalAccessCount:{}, numEntries:{}, avgAccess:{}",
+          totalAccessCount, numEntriesHashTable, avgAccess);
+      return avgAccess;
+    }
+
     /**
      * Flushes the entries in the hash table by emiting output (forward).
      * When parameter 'all' is true all the entries are flushed.
@@ -561,6 +590,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
             maxHashTblMemory/1024/1024,
             gcCanary.get() == null ? "dead" : "alive"));
       }
+      int avgAccess = computeAvgAccess();
 
       /* Iterate the global (keywrapper,aggregationbuffers) map and emit
        a row for each key */
@@ -568,10 +598,17 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
           mapKeysAggregationBuffers.entrySet().iterator();
       while(iter.hasNext()) {
         Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
+        if (!all && avgAccess >= 1) {
+          // Retain entries when access pattern is > than average access
+          if (pair.getValue().getAccessCount() > avgAccess) {
+            continue;
+          }
+        }
 
         writeSingleRow((VectorHashKeyWrapperBase) pair.getKey(), 
pair.getValue());
 
         if (!all) {
+          totalAccessCount -= pair.getValue().getAccessCount();
           iter.remove();
           --numEntriesHashTable;
           if (++entriesFlushed >= entriesToFlush) {
@@ -582,6 +619,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
 
       if (all) {
         mapKeysAggregationBuffers.clear();
+        totalAccessCount = 0;
         numEntriesHashTable = 0;
       }
 
@@ -674,7 +712,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
    * Streaming processing mode on ALREADY GROUPED data. Each input 
VectorizedRowBatch may
    * have a mix of different keys.  Intermediate values are flushed each time 
key changes.
    */
-  private class ProcessingModeStreaming extends ProcessingModeBase {
+  final class ProcessingModeStreaming extends ProcessingModeBase {
 
     /**
      * The aggregation buffers used in streaming mode
@@ -816,7 +854,7 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
    *      writeGroupRow does this and finally increments outputBatch.size.
    *
    */
-  private class ProcessingModeReduceMergePartial extends ProcessingModeBase {
+  final class ProcessingModeReduceMergePartial extends ProcessingModeBase {
 
     private boolean first;
     private boolean isLastGroupBatch;
@@ -896,7 +934,8 @@ public class VectorGroupByOperator extends 
Operator<GroupByDesc>
   /**
    * Current processing mode. Processing mode can change (eg. hash -> 
streaming).
    */
-  private transient IProcessingMode processingMode;
+  @VisibleForTesting
+  transient IProcessingMode processingMode;
 
   private static final long serialVersionUID = 1L;
 
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index 3835987..c22a833 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -39,9 +38,9 @@ import java.util.Set;
 import org.apache.calcite.util.Pair;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapDaemonInfo;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
@@ -76,8 +75,6 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
-import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BooleanWritable;
@@ -499,6 +496,31 @@ public class TestVectorGroupByOperator {
       }
     });
 
+    // vrb of 1 row each
+    FakeVectorRowBatchFromObjectIterables data = getDataForRollup();
+
+    long countRowsProduced = 0;
+    for (VectorizedRowBatch unit: data) {
+      // after 24 rows, we'd have seen all the keys
+      // find 14 keys in the hashmap
+      // but 24*0.5 = 12
+      // won't turn off hash mode because of the 3 grouping sets
+      // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows
+      countRowsProduced += unit.size;
+      vgo.process(unit,  0);
+
+      if (countRowsProduced >= 100) {
+        break;
+      }
+
+    }
+    vgo.close(false);
+    // all groupings
+    // 10 keys generates 14 rows with the rollup
+    assertEquals(1+3+10, outputRowCount);
+  }
+
+  FakeVectorRowBatchFromObjectIterables getDataForRollup() throws 
HiveException {
     // k1 has nDV of 2
     Iterable<Object> k1 = new Iterable<Object>() {
       @Override
@@ -578,33 +600,110 @@ public class TestVectorGroupByOperator {
     };
 
     // vrb of 1 row each
-    FakeVectorRowBatchFromObjectIterables data = new 
FakeVectorRowBatchFromObjectIterables(
+    return new FakeVectorRowBatchFromObjectIterables(
         2,
         new String[] {"long", "long", "long", "long"},
         k1,
         k2,
         v,
         v); // output col
+  }
+
+  @Test
+  public void testRollupAggregationWithFlush() throws HiveException {
+
+    List<String> mapColumnNames = new ArrayList<String>();
+    mapColumnNames.add("k1");
+    mapColumnNames.add("k2");
+    mapColumnNames.add("v");
+    VectorizationContext ctx = new VectorizationContext("name", 
mapColumnNames);
+
+    // select count(v) from name group by rollup (k1,k2);
+
+    Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, 
"count",
+        "v", TypeInfoFactory.longTypeInfo,
+        new String[] { "k1", "k2" },
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo, 
TypeInfoFactory.longTypeInfo});
+    GroupByDesc desc = pair.left;
+    VectorGroupByDesc vectorDesc = pair.right;
+
+    desc.setGroupingSetsPresent(true);
+    ArrayList<Long> groupingSets = new ArrayList<>();
+    // groupingSets
+    groupingSets.add(0L);
+    groupingSets.add(1L);
+    groupingSets.add(2L);
+    desc.setListGroupingSets(groupingSets);
+    // add grouping sets dummy key
+    ExprNodeDesc groupingSetDummyKey = new 
ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+    // this only works because we used an arraylist in buildKeyGroupByDesc
+    // don't do this in actual compiler
+    desc.getKeys().add(groupingSetDummyKey);
+    // groupingSet Position
+    desc.setGroupingSetPosition(2);
+
+    CompilationOpContext cCtx = new CompilationOpContext();
+
+    desc.setMinReductionHashAggr(0.5f);
+    // Set really low check interval setting
+    hconf.set("hive.groupby.mapaggr.checkinterval", "10");
+    hconf.set("hive.vectorized.groupby.checkinterval", "10");
+
+    Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, 
desc);
+
+    VectorGroupByOperator vgo =
+        (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, 
ctx, vectorDesc);
+
+    FakeCaptureVectorToRowOutputOperator out = 
FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+    vgo.initialize(hconf, null);
+
+    //Get the processing mode
+    VectorGroupByOperator.ProcessingModeHashAggregate processingMode =
+        (VectorGroupByOperator.ProcessingModeHashAggregate) vgo.processingMode;
+    assertEquals(333333,
+        
((VectorGroupByOperator.ProcessingModeHashAggregate)vgo.processingMode).getMaxHtEntries());
+
+    this.outputRowCount = 0;
+    out.setOutputInspector(new 
FakeCaptureVectorToRowOutputOperator.OutputInspector() {
+      @Override
+      public void inspectRow(Object row, int tag) throws HiveException {
+        ++outputRowCount;
+      }
+    });
+
+    FakeVectorRowBatchFromObjectIterables data = getDataForRollup();
 
     long countRowsProduced = 0;
+    long numElementsToBeRetained = 0;
+    int avgAccess = 0;
     for (VectorizedRowBatch unit: data) {
-      // after 24 rows, we'd have seen all the keys
-      // find 14 keys in the hashmap
-      // but 24*0.5 = 12
-      // won't turn off hash mode because of the 3 grouping sets
-      // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows
       countRowsProduced += unit.size;
       vgo.process(unit,  0);
 
       if (countRowsProduced >= 100) {
+        // note down avg access
+        avgAccess = processingMode.computeAvgAccess();
+        numElementsToBeRetained = 
getElementsHigherThan(processingMode.mapKeysAggregationBuffers, avgAccess);
+        // trigger flush explicitly on next iteration
+        processingMode.gcCanary.clear();
         break;
       }
+    }
+
+    // This processing would trigger flush
+    for (VectorizedRowBatch unit: data) {
+      vgo.process(unit,  0);
+      long freqElementsAfterFlush = 
getElementsHigherThan(processingMode.mapKeysAggregationBuffers, avgAccess);
 
+      assertTrue("After flush: " + freqElementsAfterFlush + ", before flush: " 
+ numElementsToBeRetained,
+          (freqElementsAfterFlush >= numElementsToBeRetained));
+      break;
     }
     vgo.close(false);
-    // all groupings
-    // 10 keys generates 14 rows with the rollup
-    assertEquals(1+3+10, outputRowCount);
+  }
+
+  long getElementsHigherThan(Map<KeyWrapper, VectorAggregationBufferRow> 
aggMap, int avgAccess) {
+    return aggMap.values().stream().filter(v -> (v.getAccessCount() > 
avgAccess)).count();
   }
 
   @Test

Reply via email to