This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 0fd99df HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan) 0fd99df is described below commit 0fd99df99dc07540d8818d179bcdcb2972f09752 Author: Qiang Kang <tka...@gmail.com> AuthorDate: Sat May 2 12:12:06 2020 -0700 HIVE-23356 : Hash aggregation is always disabled while processing querys with grouping sets expressions. (Qiang Kang via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../hadoop/hive/ql/exec/GroupByOperator.java | 17 +-- .../apache/hadoop/hive/ql/exec/TestOperators.java | 131 +++++++++++++++++++++ 2 files changed, 140 insertions(+), 8 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 7220f33..b94e3fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -739,20 +739,21 @@ public class GroupByOperator extends Operator<GroupByDesc> implements IConfigure // if hash aggregation is not behaving properly, disable it if (numRowsInput == numRowsCompareHashAggr) { numRowsCompareHashAggr += groupbyMapAggrInterval; + long numRowsProcessed = groupingSetsPresent ? numRowsInput * groupingSets.size() : numRowsInput; // map-side aggregation should reduce the entries by at-least half - if (numRowsHashTbl > numRowsInput * minReductionHashAggr) { + if (numRowsHashTbl > numRowsProcessed * minReductionHashAggr) { LOG.warn("Disable Hash Aggr: #hash table = " + numRowsHashTbl - + " #total = " + numRowsInput + " reduction = " + 1.0 - * (numRowsHashTbl / numRowsInput) + " minReduction = " - + minReductionHashAggr); + + " #numRowsInput = " + numRowsInput + " reduction = " + 1.0 * (numRowsHashTbl / numRowsProcessed) + + " minReduction = " + minReductionHashAggr + " groupingSetsPresent = " + groupingSetsPresent + + " numRowsProcessed = " + numRowsProcessed); flushHashTable(true); hashAggr = false; } else { if (LOG.isTraceEnabled()) { - LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl - + " #total = " + numRowsInput + " reduction = " + 1.0 - * (numRowsHashTbl / numRowsInput) + " minReduction = " - + minReductionHashAggr); + LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl + " #numRowsInput = " + numRowsInput + + " reduction = " + 1.0 * (numRowsHashTbl / numRowsProcessed) + " minReduction = " + + minReductionHashAggr + " groupingSetsPresent = " + groupingSetsPresent + " numRowsProcessed = " + + numRowsProcessed); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java index 8a0606b..3c0a7eb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java @@ -32,14 +32,18 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOContextMap; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin; import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck; import org.apache.hadoop.hive.ql.parse.type.TypeCheckProcFactory; +import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.CollectDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -49,6 +53,7 @@ import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -525,4 +530,130 @@ public class TestOperators { assertEquals(5, convertJoinMapJoin.getMemoryMonitorInfo(hiveConf, null).getMaxExecutorsOverSubscribeMemory()); } + + @Test public void testHashGroupBy() throws HiveException { + InspectableObject[] input = constructHashAggrInputData(5, 3); + System.out.println("---------------Begin to Construct Groupby Desc-------------"); + // 1. Build AggregationDesc + String aggregate = "MAX"; + ExprNodeDesc inputColumn = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col0", "table", false); + ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>(); + params.add(inputColumn); + GenericUDAFEvaluator genericUDAFEvaluator = + SemanticAnalyzer.getGenericUDAFEvaluator(aggregate, params, null, false, false); + AggregationDesc agg = + new AggregationDesc(aggregate, genericUDAFEvaluator, params, false, GenericUDAFEvaluator.Mode.PARTIAL1); + ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>(); + aggs.add(agg); + + // 2. aggr keys + ExprNodeDesc key1 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col1", "table", false); + ExprNodeDesc key2 = new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, "col2", "table", false); + ArrayList<ExprNodeDesc> keys = new ArrayList<>(); + keys.add(key1); + keys.add(key2); + + // 3. outputCols + // @see org.apache.hadoop.hive.ql.exec.GroupByOperator.forward + // outputColumnNames, including: group by keys, agg evaluators output cols. + ArrayList<String> outputColumnNames = new ArrayList<String>(); + for (int i = 0; i < keys.size() + aggs.size(); i++) { + outputColumnNames.add("_col" + i); + } + // 4. build GroupByDesc desc + GroupByDesc desc = new GroupByDesc(); + desc.setOutputColumnNames(outputColumnNames); + desc.setAggregators(aggs); + desc.setKeys(keys); + desc.setMode(GroupByDesc.Mode.HASH); + desc.setMemoryThreshold(1.0f); + desc.setGroupByMemoryUsage(1.0f); + // minReductionHashAggr + desc.setMinReductionHashAggr(0.5f); + + // 5. Configure hive conf and Build group by operator + HiveConf hconf = new HiveConf(); + HiveConf.setIntVar(hconf, HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL, 1); + + // 6. test hash aggr without grouping sets + System.out.println("---------------Begin to test hash group by without grouping sets-------------"); + int withoutGroupingSetsExpectSize = 3; + GroupByOperator op = new GroupByOperator(new CompilationOpContext()); + op.setConf(desc); + testHashAggr(op, hconf, input, withoutGroupingSetsExpectSize); + + // 7. test hash aggr with grouping sets + System.out.println("---------------Begin to test hash group by with grouping sets------------"); + int groupingSetsExpectSize = 6; + + desc.setGroupingSetsPresent(true); + ArrayList<Long> groupingSets = new ArrayList<>(); + // groupingSets + groupingSets.add(1L); + groupingSets.add(2L); + desc.setListGroupingSets(groupingSets); + // add grouping sets dummy key + ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L); + keys.add(groupingSetDummyKey); + desc.setKeys(keys); + // groupingSet Position + desc.setGroupingSetPosition(2); + op = new GroupByOperator(new CompilationOpContext()); + op.setConf(desc); + testHashAggr(op, hconf, input, groupingSetsExpectSize); + } + + private void testHashAggr(GroupByOperator op, HiveConf hconf, InspectableObject[] r, int expectOutputSize) + throws HiveException { + // 1. Collect operator to observe the output of the group by operator + CollectDesc cd = new CollectDesc(expectOutputSize + 10); + CollectOperator cdop = (CollectOperator) OperatorFactory.getAndMakeChild(cd, op); + op.initialize(hconf, new ObjectInspector[] { r[0].oi }); + // 2. Evaluate on rows and check hashAggr flag + for (int i = 0; i < r.length; i++) { + op.process(r[i].o, 0); + } + op.close(false); + InspectableObject io = new InspectableObject(); + int output = 0; + // 3. Print group by results + do { + cdop.retrieve(io); + if (io.o != null) { + System.out.println("io.o = " + io.o); + output++; + } + } while (io.o != null); + // 4. Check partial result size + assertEquals(expectOutputSize, output); + } + + private InspectableObject[] constructHashAggrInputData(int rowNum, int rowNumWithSameKeys) { + InspectableObject[] r; + r = new InspectableObject[rowNum]; + ArrayList<String> names = new ArrayList<String>(3); + names.add("col0"); + names.add("col1"); + names.add("col2"); + ArrayList<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(3); + objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + objectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + // 3 rows with the same col1, col2 + for (int i = 0; i < rowNum; i++) { + ArrayList<String> data = new ArrayList<String>(); + data.add("" + i); + data.add("" + (i < rowNumWithSameKeys ? -1 : i)); + data.add("" + (i < rowNumWithSameKeys ? -1 : i)); + try { + r[i] = new InspectableObject(); + r[i].o = data; + r[i].oi = ObjectInspectorFactory.getStandardStructObjectInspector(names, objectInspectors); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return r; + } + }