Repository: hive Updated Branches: refs/heads/branch-3 b3a424bd0 -> 8703a3229
HIVE-20648: LLAP: Vector group by operator should use memory per executor Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8703a322 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8703a322 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8703a322 Branch: refs/heads/branch-3 Commit: 8703a3229e5a8d4afdd0e2ebd06579df40b01ed8 Parents: b3a424b Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Tue Oct 9 11:05:55 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Tue Oct 9 11:06:28 2018 -0700 ---------------------------------------------------------------------- .../ql/exec/vector/VectorGroupByOperator.java | 24 +++-- .../exec/vector/TestVectorGroupByOperator.java | 96 ++++++++++++++++++++ 2 files changed, 112 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8703a322/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 43f1162..4dfd179 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -33,6 +33,8 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapUtil; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.IConfigureJobConf; @@ -146,6 +148,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> private float memoryThreshold; + private boolean isLlap = false; /** * Interface for processing mode: global, hash, unsorted streaming, or group batch */ @@ -515,7 +518,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> aggregationBatchInfo.getAggregatorsFixedSize(); MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); + maxMemory = isLlap ? getConf().getMaxMemoryAvailable() : memoryMXBean.getHeapMemoryUsage().getMax(); memoryThreshold = conf.getMemoryThreshold(); // Tests may leave this unitialized, so better set it to 1 if (memoryThreshold == 0.0f) { @@ -525,13 +528,14 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> maxHashTblMemory = (int)(maxMemory * memoryThreshold); if (LOG.isDebugEnabled()) { - LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)", - maxHashTblMemory/1024/1024, - maxMemory/1024/1024, - memoryThreshold, - fixedHashEntrySize, - keyWrappersBatch.getKeysFixedSize(), - aggregationBatchInfo.getAggregatorsFixedSize())); + LOG.debug("GBY memory limits - isLlap: {} maxMemory: {} ({} * {}) fixSize:{} (key:{} agg:{})", + isLlap, + LlapUtil.humanReadableByteCount(maxHashTblMemory), + LlapUtil.humanReadableByteCount(maxMemory), + memoryThreshold, + fixedHashEntrySize, + keyWrappersBatch.getKeysFixedSize(), + aggregationBatchInfo.getAggregatorsFixedSize()); } } @@ -975,6 +979,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + isLlap = LlapProxy.isDaemon(); VectorExpression.doTransientInit(keyExpressions); List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(); @@ -1231,4 +1236,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> } } + public long getMaxMemory() { + return maxMemory; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8703a322/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java index ffdc410..e2a593f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java @@ -38,6 +38,8 @@ import java.util.Set; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; +import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -285,6 +287,8 @@ public class TestVectorGroupByOperator { FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); vgo.initialize(hconf, null); + long expected = vgo.getMaxMemory(); + assertEquals(expected, maxMemory); this.outputRowCount = 0; out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { @Override @@ -342,6 +346,98 @@ public class TestVectorGroupByOperator { } @Test + public void testMemoryPressureFlushLlap() throws HiveException { + + try { + List<String> mapColumnNames = new ArrayList<String>(); + mapColumnNames.add("Key"); + mapColumnNames.add("Value"); + VectorizationContext ctx = new VectorizationContext("name", mapColumnNames); + + Pair<GroupByDesc, VectorGroupByDesc> pair = buildKeyGroupByDesc(ctx, "max", + "Value", TypeInfoFactory.longTypeInfo, + "Key", TypeInfoFactory.longTypeInfo); + GroupByDesc desc = pair.fst; + VectorGroupByDesc vectorDesc = pair.snd; + + LlapProxy.setDaemon(true); + + CompilationOpContext cCtx = new CompilationOpContext(); + + Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc); + + VectorGroupByOperator vgo = + (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc); + + FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo); + long maxMemory=512*1024*1024L; + vgo.getConf().setMaxMemoryAvailable(maxMemory); + float threshold = 100.0f*1024.0f/maxMemory; + desc.setMemoryThreshold(threshold); + vgo.initialize(hconf, null); + + long got = vgo.getMaxMemory(); + assertEquals(maxMemory, got); + this.outputRowCount = 0; + out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() { + @Override + public void inspectRow(Object row, int tag) throws HiveException { + ++outputRowCount; + } + }); + + Iterable<Object> it = new Iterable<Object>() { + @Override + public Iterator<Object> iterator() { + return new Iterator<Object>() { + long value = 0; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Object next() { + return ++value; + } + + @Override + public void remove() { + } + }; + } + }; + + FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables( + 100, + new String[]{"long", "long"}, + it, + it); + + // The 'it' data source will produce data w/o ever ending + // We want to see that memory pressure kicks in and some + // entries in the VGBY are flushed. + long countRowsProduced = 0; + for (VectorizedRowBatch unit : data) { + countRowsProduced += 100; + vgo.process(unit, 0); + if (0 < outputRowCount) { + break; + } + // Set an upper bound how much we're willing to push before it should flush + // we've set the memory treshold at 100kb, each key is distinct + // It should not go beyond 100k/16 (key+data) + assertTrue(countRowsProduced < 100 * 1024 / 16); + } + + assertTrue(0 < outputRowCount); + } finally { + LlapProxy.setDaemon(false); + } + } + + @Test public void testMultiKeyIntStringInt() throws HiveException { testMultiKey( "sum",