HIVE-16132: DataSize stats don't seem correct in semijoin opt branch (Deepak Jaiswal via Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/be47d9e3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/be47d9e3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/be47d9e3 Branch: refs/heads/master Commit: be47d9e3fae437c7644e47679119d20b86f8a332 Parents: c76ce91 Author: Gunther Hagleitner <gunt...@apache.org> Authored: Mon Mar 13 11:14:56 2017 -0700 Committer: Gunther Hagleitner <gunt...@apache.org> Committed: Mon Mar 13 11:15:22 2017 -0700 ---------------------------------------------------------------------- .../DynamicPartitionPruningOptimization.java | 57 +- .../clientpositive/dynamic_semijoin_reduction.q | 14 +- .../llap/dynamic_semijoin_reduction.q.out | 870 +++++++++++-------- .../results/clientpositive/llap/mergejoin.q.out | 2 +- 4 files changed, 584 insertions(+), 359 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/be47d9e3/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index e6f21e9..b6db6aa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -394,19 +394,18 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { // we need the expr that generated the key of the reduce sink ExprNodeDesc key = ctx.generator.getConf().getKeyCols().get(ctx.desc.getKeyIndex()); - if (parentOfRS instanceof SelectOperator) { - // Make sure the semijoin branch is not on parition column. - String internalColName = null; - ExprNodeDesc exprNodeDesc = key; - // Find the ExprNodeColumnDesc - while (!(exprNodeDesc instanceof ExprNodeColumnDesc) && - (exprNodeDesc.getChildren() != null)) { - exprNodeDesc = exprNodeDesc.getChildren().get(0); - } - - if (exprNodeDesc instanceof ExprNodeColumnDesc) { - internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn(); + String internalColName = null; + ExprNodeDesc exprNodeDesc = key; + // Find the ExprNodeColumnDesc + while (!(exprNodeDesc instanceof ExprNodeColumnDesc) && + (exprNodeDesc.getChildren() != null)) { + exprNodeDesc = exprNodeDesc.getChildren().get(0); + } + if (exprNodeDesc instanceof ExprNodeColumnDesc) { + internalColName = ((ExprNodeColumnDesc) exprNodeDesc).getColumn(); + if (parentOfRS instanceof SelectOperator) { + // Make sure the semijoin branch is not on parition column. ExprNodeColumnDesc colExpr = ((ExprNodeColumnDesc) (parentOfRS. getColumnExprMap().get(internalColName))); String colName = ExprNodeDescUtils.extractColName(colExpr); @@ -423,12 +422,13 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { // The column is partition column, skip the optimization. return false; } - } else { - // No column found! - // Bail out - return false; } + } else { + // No column found! + // Bail out + return false; } + List<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>(); keyExprs.add(key); @@ -438,9 +438,32 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { // project the relevant key column SelectDesc select = new SelectDesc(keyExprs, outputNames); + + // Create the new RowSchema for the projected column + ColumnInfo columnInfo = parentOfRS.getSchema().getColumnInfo(internalColName); + ArrayList<ColumnInfo> signature = new ArrayList<ColumnInfo>(); + signature.add(columnInfo); + RowSchema rowSchema = new RowSchema(signature); + + // Create the column expr map + Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>(); + ExprNodeDesc exprNode = null; + if ( parentOfRS.getColumnExprMap() != null) { + exprNode = parentOfRS.getColumnExprMap().get(internalColName).clone(); + } else { + exprNode = new ExprNodeColumnDesc(columnInfo); + } + + if (exprNode instanceof ExprNodeColumnDesc) { + ExprNodeColumnDesc encd = (ExprNodeColumnDesc) exprNode; + encd.setColumn(internalColName); + } + colExprMap.put(internalColName, exprNode); + + // Create the Select Operator SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(select, - new RowSchema(parentOfRS.getSchema()), parentOfRS); + rowSchema, colExprMap, parentOfRS); // do a group by to aggregate min,max and bloom filter. float groupByMemoryUsage = http://git-wip-us.apache.org/repos/asf/hive/blob/be47d9e3/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q index e686af6..5482cdb 100644 --- a/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q +++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_reduction.q @@ -7,6 +7,7 @@ set hive.tez.dynamic.partition.pruning=true; set hive.tez.dynamic.semijoin.reduction=true; set hive.optimize.metadataonly=false; set hive.optimize.index.filter=true; +set hive.stats.autogather=true; -- Create Tables create table alltypesorc_int ( cint int, cstring string ) stored as ORC; @@ -27,6 +28,10 @@ insert overwrite table srcpart_date partition (ds = "2008-04-09") select key, va insert overwrite table srcpart_small partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09"; set hive.tez.dynamic.semijoin.reduction=false; +analyze table alltypesorc_int compute statistics for columns; +analyze table srcpart_date compute statistics for columns; +analyze table srcpart_small compute statistics for columns; + -- single column, single key EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); @@ -52,8 +57,8 @@ set hive.tez.dynamic.semijoin.reduction=false; EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1); select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1); set hive.tez.dynamic.semijoin.reduction=true; -select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1); EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1); +select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1); set hive.tez.dynamic.semijoin.reduction=false; -- multiple sources, different keys @@ -75,6 +80,13 @@ set hive.tez.dynamic.semijoin.reduction=true; EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1); +-- With unions +explain select * from alltypesorc_int join + (select srcpart_date.key as key from srcpart_date + union all + select srcpart_small.key1 as key from srcpart_small) unionsrc on (alltypesorc_int.cstring = unionsrc.key); + + drop table srcpart_date; drop table srcpart_small; drop table alltypesorc_int;