This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit 380be9afd1a364fe0ff83e61e17ba4ced12f29a0 Author: Rajesh Balamohan <rbalamo...@apache.org> AuthorDate: Tue Jul 14 10:00:14 2020 +0530 HIVE-23843: Improve key evictions in VectorGroupByOperator (Rajesh Balamohan via Ashutosh Chauhan, Zoltan Haindrich) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../ql/exec/vector/VectorAggregationBufferRow.java | 12 +- .../hive/ql/exec/vector/VectorGroupByOperator.java | 53 +++++++-- .../ql/exec/vector/TestVectorGroupByOperator.java | 125 ++++++++++++++++++--- 3 files changed, 168 insertions(+), 22 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java index 494db35..a7ef154 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java @@ -28,7 +28,8 @@ public class VectorAggregationBufferRow { private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers; private int version; private int index; - + private int accessed = 0; + public VectorAggregationBufferRow( VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) { this.aggregationBuffers = aggregationBuffers; @@ -80,5 +81,12 @@ public class VectorAggregationBufferRow { aggregationBuffers[i].reset(); } } - + + public int getAccessCount() { + return accessed; + } + + public void incrementAccessCount() { + accessed++; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 9f81e8e..85535f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -151,6 +151,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> private float memoryThreshold; private boolean isLlap = false; + + // tracks overall access count in map agg buffer any given time. + private long totalAccessCount; + /** * Interface for processing mode: global, hash, unsorted streaming, or group batch */ @@ -251,7 +255,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> * This mode is very simple, there are no keys to consider, and only flushes one row at closing * The one row must flush even if no input was seen (NULLs) */ - private class ProcessingModeGlobalAggregate extends ProcessingModeBase { + final class ProcessingModeGlobalAggregate extends ProcessingModeBase { /** * In global processing mode there is only one set of aggregation buffers @@ -288,12 +292,13 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> /** * Hash Aggregate mode processing */ - private class ProcessingModeHashAggregate extends ProcessingModeBase { + final class ProcessingModeHashAggregate extends ProcessingModeBase { /** * The global key-aggregation hash map. */ - private Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers; + @VisibleForTesting + Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers; /** * Total per hashtable entry fixed memory (does not depend on key/agg values). @@ -334,7 +339,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> /** * A soft reference used to detect memory pressure */ - private SoftReference<Object> gcCanary = new SoftReference<Object>(new Object()); + @VisibleForTesting + SoftReference<Object> gcCanary = new SoftReference<Object>(new Object()); /** * Counts the number of time the gcCanary died and was resurrected @@ -387,10 +393,19 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> sumBatchSize = 0; mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>(); + if (groupingSets != null && groupingSets.length > 0) { + this.maxHtEntries = this.maxHtEntries / groupingSets.length; + LOG.info("New maxHtEntries: {}, groupingSets len: {}", maxHtEntries, groupingSets.length); + } computeMemoryLimits(); LOG.debug("using hash aggregation processing mode"); } + @VisibleForTesting + int getMaxHtEntries() { + return maxHtEntries; + } + @Override public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet, boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException { @@ -502,6 +517,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer); numEntriesHashTable++; numEntriesSinceCheck++; + } else { + // for access tracking + aggregationBuffer.incrementAccessCount(); + totalAccessCount++; } aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i); } @@ -540,6 +559,16 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> } } + int computeAvgAccess() { + if (numEntriesHashTable == 0) { + return 0; + } + int avgAccess = (int) (totalAccessCount / numEntriesHashTable); + LOG.debug("totalAccessCount:{}, numEntries:{}, avgAccess:{}", + totalAccessCount, numEntriesHashTable, avgAccess); + return avgAccess; + } + /** * Flushes the entries in the hash table by emiting output (forward). * When parameter 'all' is true all the entries are flushed. @@ -561,6 +590,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> maxHashTblMemory/1024/1024, gcCanary.get() == null ? "dead" : "alive")); } + int avgAccess = computeAvgAccess(); /* Iterate the global (keywrapper,aggregationbuffers) map and emit a row for each key */ @@ -568,10 +598,17 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> mapKeysAggregationBuffers.entrySet().iterator(); while(iter.hasNext()) { Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next(); + if (!all && avgAccess >= 1) { + // Retain entries when access pattern is > than average access + if (pair.getValue().getAccessCount() > avgAccess) { + continue; + } + } writeSingleRow((VectorHashKeyWrapperBase) pair.getKey(), pair.getValue()); if (!all) { + totalAccessCount -= pair.getValue().getAccessCount(); iter.remove(); --numEntriesHashTable; if (++entriesFlushed >= entriesToFlush) { @@ -582,6 +619,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> if (all) { mapKeysAggregationBuffers.clear(); + totalAccessCount = 0; numEntriesHashTable = 0; } @@ -674,7 +712,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> * Streaming processing mode on ALREADY GROUPED data. Each input VectorizedRowBatch may * have a mix of different keys. Intermediate values are flushed each time key changes. */ - private class ProcessingModeStreaming extends ProcessingModeBase { + final class ProcessingModeStreaming extends ProcessingModeBase { /** * The aggregation buffers used in streaming mode @@ -816,7 +854,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> * writeGroupRow does this and finally increments outputBatch.size. * */ - private class ProcessingModeReduceMergePartial extends ProcessingModeBase { + final class ProcessingModeReduceMergePartial extends ProcessingModeBase { private boolean first; private boolean isLastGroupBatch; @@ -896,7 +934,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> /** * Current processing mode. Processing mode can change (eg. hash -> streaming). */ - private transient IProcessingMode processingMode; + @VisibleForTesting + transient IProcessingMode processingMode; private static final long serialVersionUID = 1L; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index 3835987..c22a833 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.vector; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -39,9 +38,9 @@ import java.util.Set; import org.apache.calcite.util.Pair; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.KeyWrapper; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar; @@ -76,8 +75,6 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BooleanWritable; @@ -499,6 +496,31 @@ public class TestVectorGroupByOperator { } }); + // vrb of 1 row each + FakeVectorRowBatchFromObjectIterables data = getDataForRollup(); + + long countRowsProduced = 0; + for (VectorizedRowBatch unit: data) { + // after 24 rows, we'd have seen all the keys + // find 14 keys in the hashmap + // but 24*0.5 = 12 + // won't turn off hash mode because of the 3 grouping sets + // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows + countRowsProduced += unit.size; + vgo.process(unit, 0); + + if (countRowsProduced >= 100) { + break; + } + + } + vgo.close(false); + // all groupings + // 10 keys generates 14 rows with the rollup + assertEquals(1+3+10, outputRowCount); + } + + FakeVectorRowBatchFromObjectIterables getDataForRollup() throws HiveException { // k1 has nDV of 2 Iterable<Object> k1 = new Iterable<Object>() { @Override @@ -578,33 +600,110 @@ public class TestVectorGroupByOperator { }; // vrb of 1 row each - FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables( + return new FakeVectorRowBatchFromObjectIterables( 2, new String[] {"long", "long", "long", "long"}, k1, k2, v, v); // output col + } + + @Test + public void testRollupAggregationWithFlush() throws HiveException { + + List<String> mapColumnNames = new ArrayList<String>(); + mapColumnNames.add("k1"); + mapColumnNames.add("k2"); + mapColumnNames.add("v"); + VectorizationContext ctx = new VectorizationContext("name", mapColumnNames); + + // select count(v) from name group by rollup (k1,k2); + + Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "count", + "v", TypeInfoFactory.longTypeInfo, + new String[] { "k1", "k2" }, + new TypeInfo[] {TypeInfoFactory.longTypeInfo, TypeInfoFactory.longTypeInfo}); + GroupByDesc desc = pair.left; + VectorGroupByDesc vectorDesc = pair.right; + + desc.setGroupingSetsPresent(true); + ArrayList<Long> groupingSets = new ArrayList<>(); + // groupingSets + groupingSets.add(0L); + groupingSets.add(1L); + groupingSets.add(2L); + desc.setListGroupingSets(groupingSets); + // add grouping sets dummy key + ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L); + // this only works because we used an arraylist in buildKeyGroupByDesc + // don't do this in actual compiler + desc.getKeys().add(groupingSetDummyKey); + // groupingSet Position + desc.setGroupingSetPosition(2); + + CompilationOpContext cCtx = new CompilationOpContext(); + + desc.setMinReductionHashAggr(0.5f); + // Set really low check interval setting + hconf.set("hive.groupby.mapaggr.checkinterval", "10"); + hconf.set("hive.vectorized.groupby.checkinterval", "10"); + + Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc); + + VectorGroupByOperator vgo = + (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc); + + FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); + vgo.initialize(hconf, null); + + //Get the processing mode + VectorGroupByOperator.ProcessingModeHashAggregate processingMode = + (VectorGroupByOperator.ProcessingModeHashAggregate) vgo.processingMode; + assertEquals(333333, + ((VectorGroupByOperator.ProcessingModeHashAggregate)vgo.processingMode).getMaxHtEntries()); + + this.outputRowCount = 0; + out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { + @Override + public void inspectRow(Object row, int tag) throws HiveException { + ++outputRowCount; + } + }); + + FakeVectorRowBatchFromObjectIterables data = getDataForRollup(); long countRowsProduced = 0; + long numElementsToBeRetained = 0; + int avgAccess = 0; for (VectorizedRowBatch unit: data) { - // after 24 rows, we'd have seen all the keys - // find 14 keys in the hashmap - // but 24*0.5 = 12 - // won't turn off hash mode because of the 3 grouping sets - // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows countRowsProduced += unit.size; vgo.process(unit, 0); if (countRowsProduced >= 100) { + // note down avg access + avgAccess = processingMode.computeAvgAccess(); + numElementsToBeRetained = getElementsHigherThan(processingMode.mapKeysAggregationBuffers, avgAccess); + // trigger flush explicitly on next iteration + processingMode.gcCanary.clear(); break; } + } + + // This processing would trigger flush + for (VectorizedRowBatch unit: data) { + vgo.process(unit, 0); + long freqElementsAfterFlush = getElementsHigherThan(processingMode.mapKeysAggregationBuffers, avgAccess); + assertTrue("After flush: " + freqElementsAfterFlush + ", before flush: " + numElementsToBeRetained, + (freqElementsAfterFlush >= numElementsToBeRetained)); + break; } vgo.close(false); - // all groupings - // 10 keys generates 14 rows with the rollup - assertEquals(1+3+10, outputRowCount); + } + + long getElementsHigherThan(Map<KeyWrapper, VectorAggregationBufferRow> aggMap, int avgAccess) { + return aggMap.values().stream().filter(v -> (v.getAccessCount() > avgAccess)).count(); } @Test