Repository: hive Updated Branches: refs/heads/master 2c445cc8d -> 99fa337b0
HIVE-11445 : CBO: Calcite Operator To Hive Operator (Calcite Return Path) : groupby distinct does not work (Jesus Camacho Rodriguez, reviewed by Pengcheng Xiong) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/99fa337b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/99fa337b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/99fa337b Branch: refs/heads/master Commit: 99fa337b0b146be984fc49d52ecb1a3494164082 Parents: 2c445cc Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Wed Sep 30 09:32:27 2015 +0100 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Thu Oct 1 10:10:05 2015 +0100 ---------------------------------------------------------------------- .../calcite/translator/HiveGBOpConvUtil.java | 43 ++-- .../cbo_rp_gby2_map_multi_distinct.q | 38 +++ .../cbo_rp_gby2_map_multi_distinct.q.out | 236 +++++++++++++++++++ 3 files changed, 299 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/99fa337b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java index a0e2e67..a129cf3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java @@ -40,15 +40,14 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter.OpAttr; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.GenericUDAFInfo; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -70,12 +69,17 @@ import com.google.common.collect.ImmutableList; * external names if possible.<br> * 3. In ExprNode & in ColumnInfo the tableAlias/VirtualColumn is specified * differently for different GB/RS in pipeline. Remove the different treatments. - * 3. VirtualColMap needs to be maintained + * 4. VirtualColMap needs to be maintained * */ public class HiveGBOpConvUtil { + private static enum HIVEGBPHYSICALMODE { - MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB, MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB, MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT, MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT, NO_MAP_SIDE_GB_NO_SKEW, NO_MAP_SIDE_GB_SKEW + MAP_SIDE_GB_NO_SKEW_NO_ADD_MR_JOB, + MAP_SIDE_GB_NO_SKEW_ADD_MR_JOB, + MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT, + MAP_SIDE_GB_SKEW_GBKEYS_AND_DIST_UDAF_NOT_PRESENT, + NO_MAP_SIDE_GB_NO_SKEW, NO_MAP_SIDE_GB_SKEW }; private static class UDAFAttrs { @@ -94,8 +98,8 @@ public class HiveGBOpConvUtil { private final List<ExprNodeDesc> gbKeys = new ArrayList<ExprNodeDesc>(); private final List<Integer> grpSets = new ArrayList<Integer>(); - private boolean grpSetRqrAdditionalMRJob; - private boolean grpIdFunctionNeeded; + private boolean grpSetRqrAdditionalMRJob; + private boolean grpIdFunctionNeeded; private final List<String> distExprNames = new ArrayList<String>(); private final List<TypeInfo> distExprTypes = new ArrayList<TypeInfo>(); @@ -105,12 +109,12 @@ public class HiveGBOpConvUtil { private final List<ExprNodeDesc> deDupedNonDistIrefs = new ArrayList<ExprNodeDesc>(); private final List<UDAFAttrs> udafAttrs = new ArrayList<UDAFAttrs>(); - private boolean containsDistinctAggr = false; + private boolean containsDistinctAggr = false; - float groupByMemoryUsage; - float memoryThreshold; + float groupByMemoryUsage; + float memoryThreshold; - private HIVEGBPHYSICALMODE gbPhysicalPipelineMode; + private HIVEGBPHYSICALMODE gbPhysicalPipelineMode; }; private static HIVEGBPHYSICALMODE getAggOPMode(HiveConf hc, GBInfo gbInfo) { @@ -203,11 +207,14 @@ public class HiveGBOpConvUtil { for (int i = 0; i < argLst.size(); i++) { if (!distinctRefs.contains(argLst.get(i))) { distinctRefs.add(argLst.get(i)); - distParamInRefsToOutputPos.put(argLst.get(i), gbInfo.distExprNodes.size()); distinctExpr = HiveCalciteUtil.getExprNode(argLst.get(i), aggInputRel, exprConv); - gbInfo.distExprNodes.add(distinctExpr); - gbInfo.distExprNames.add(argNames.get(i)); - gbInfo.distExprTypes.add(distinctExpr.getTypeInfo()); + // Only distinct nodes that are NOT part of the key should be added to distExprNodes + if (ExprNodeDescUtils.indexOf(distinctExpr, gbInfo.gbKeys) < 0) { + distParamInRefsToOutputPos.put(argLst.get(i), gbInfo.distExprNodes.size()); + gbInfo.distExprNodes.add(distinctExpr); + gbInfo.distExprNames.add(argNames.get(i)); + gbInfo.distExprTypes.add(distinctExpr.getTypeInfo()); + } } } } @@ -254,10 +261,10 @@ public class HiveGBOpConvUtil { } // special handling for count, similar to PlanModifierForASTConv::replaceEmptyGroupAggr() - udafAttrs.udafEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(udafAttrs.udafName, - new ArrayList<ExprNodeDesc>(udafAttrs.udafParams), new ASTNode(), - udafAttrs.isDistinctUDAF, udafAttrs.udafParams.size() == 0 && - "count".equalsIgnoreCase(udafAttrs.udafName) ? true : false); + udafAttrs.udafEvaluator = SemanticAnalyzer.getGenericUDAFEvaluator(udafAttrs.udafName, + new ArrayList<ExprNodeDesc>(udafAttrs.udafParams), new ASTNode(), + udafAttrs.isDistinctUDAF, udafAttrs.udafParams.size() == 0 && + "count".equalsIgnoreCase(udafAttrs.udafName) ? true : false); gbInfo.udafAttrs.add(udafAttrs); } http://git-wip-us.apache.org/repos/asf/hive/blob/99fa337b/ql/src/test/queries/clientpositive/cbo_rp_gby2_map_multi_distinct.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/cbo_rp_gby2_map_multi_distinct.q b/ql/src/test/queries/clientpositive/cbo_rp_gby2_map_multi_distinct.q new file mode 100644 index 0000000..28f1f81 --- /dev/null +++ b/ql/src/test/queries/clientpositive/cbo_rp_gby2_map_multi_distinct.q @@ -0,0 +1,38 @@ +set hive.cbo.enable=true; +set hive.cbo.returnpath.hiveop=true; + +set hive.map.aggr=true; +set hive.groupby.skewindata=false; +set mapred.reduce.tasks=31; + +-- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE; + +EXPLAIN +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1); + +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1); + +SELECT dest1.* FROM dest1; + +-- HIVE-5560 when group by key is used in distinct funtion, invalid result are returned + +EXPLAIN +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.key,1,1)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1); + +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.key,1,1)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1); + +SELECT dest1.* FROM dest1; http://git-wip-us.apache.org/repos/asf/hive/blob/99fa337b/ql/src/test/results/clientpositive/cbo_rp_gby2_map_multi_distinct.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/cbo_rp_gby2_map_multi_distinct.q.out b/ql/src/test/results/clientpositive/cbo_rp_gby2_map_multi_distinct.q.out new file mode 100644 index 0000000..8592d6c --- /dev/null +++ b/ql/src/test/results/clientpositive/cbo_rp_gby2_map_multi_distinct.q.out @@ -0,0 +1,236 @@ +PREHOOK: query: -- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dest1 +POSTHOOK: query: -- SORT_QUERY_RESULTS + +CREATE TABLE dest1(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dest1 +PREHOOK: query: EXPLAIN +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: substr(key, 1, 1) (type: string), substr(value, 5) (type: string), value (type: string) + outputColumnNames: $f0, $f1, $f2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(DISTINCT $f1), sum($f1), sum(DISTINCT $f1), count($f2) + keys: $f0 (type: string), $f1 (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col3 (type: double), _col5 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0), sum(VALUE._col1), sum(DISTINCT KEY._col1:1._col0), count(VALUE._col3) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: $f0, $f1, $f2, $f3, $f4 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: $f0 (type: string), UDFToInteger($f1) (type: int), concat($f0, $f2) (type: string), UDFToInteger($f3) (type: int), UDFToInteger($f4) (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-2 + Stats-Aggr Operator + +PREHOOK: query: FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.c1 EXPRESSION [(src)src.null, ] +POSTHOOK: Lineage: dest1.c2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.c3 EXPRESSION [(src)src.null, ] +POSTHOOK: Lineage: dest1.c4 EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 1 00.0 0 3 +1 71 116414.0 10044 115 +2 69 225571.0 15780 111 +3 62 332004.0 20119 99 +4 74 452763.0 30965 124 +5 6 5397.0 278 10 +6 5 6398.0 331 6 +7 6 7735.0 447 10 +8 8 8762.0 595 10 +9 7 91047.0 577 12 +PREHOOK: query: -- HIVE-5560 when group by key is used in distinct funtion, invalid result are returned + +EXPLAIN +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.key,1,1)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-5560 when group by key is used in distinct funtion, invalid result are returned + +EXPLAIN +FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.key,1,1)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: substr(key, 1, 1) (type: string), substr(value, 5) (type: string), value (type: string) + outputColumnNames: $f0, $f1, $f2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(DISTINCT $f0), sum($f1), sum(DISTINCT $f1), count($f2) + keys: $f0 (type: string), $f1 (type: string) + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col3 (type: double), _col5 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0), sum(VALUE._col1), sum(DISTINCT KEY._col1:1._col0), count(VALUE._col3) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: $f0, $f1, $f2, $f3, $f4 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: $f0 (type: string), UDFToInteger($f1) (type: int), concat($f0, $f2) (type: string), UDFToInteger($f3) (type: int), UDFToInteger($f4) (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-2 + Stats-Aggr Operator + +PREHOOK: query: FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.key,1,1)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@dest1 +POSTHOOK: query: FROM src +INSERT OVERWRITE TABLE dest1 +SELECT substr(src.key,1,1), count(DISTINCT substr(src.key,1,1)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) +GROUP BY substr(src.key,1,1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@dest1 +POSTHOOK: Lineage: dest1.c1 EXPRESSION [(src)src.null, ] +POSTHOOK: Lineage: dest1.c2 EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), (src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.c3 EXPRESSION [(src)src.null, ] +POSTHOOK: Lineage: dest1.c4 EXPRESSION [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: dest1.key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +PREHOOK: query: SELECT dest1.* FROM dest1 +PREHOOK: type: QUERY +PREHOOK: Input: default@dest1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT dest1.* FROM dest1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dest1 +#### A masked pattern was here #### +0 1 00.0 0 3 +1 1 116414.0 10044 115 +2 1 225571.0 15780 111 +3 1 332004.0 20119 99 +4 1 452763.0 30965 124 +5 1 5397.0 278 10 +6 1 6398.0 331 6 +7 1 7735.0 447 10 +8 1 8762.0 595 10 +9 1 91047.0 577 12