HIVE-11605: Incorrect results with bucket map join in tez. (Vikram Dixit K, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ea8e296 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ea8e296 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ea8e296 Branch: refs/heads/llap Commit: 4ea8e29619eb0bbb02e3f7c09ffc9d44bf4cdfef Parents: 594e25a Author: vikram <vik...@hortonworks.com> Authored: Thu Sep 10 13:13:56 2015 -0700 Committer: vikram <vik...@hortonworks.com> Committed: Thu Sep 10 13:30:23 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 18 ++- .../ql/optimizer/ReduceSinkMapJoinProc.java | 8 +- .../clientpositive/bucket_map_join_tez1.q | 9 ++ .../spark/bucket_map_join_tez1.q.out | 131 +++++++++++++++++++ .../tez/bucket_map_join_tez1.q.out | 123 +++++++++++++++++ 5 files changed, 280 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index e3acdfc..8ea1879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -375,13 +375,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { } ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp - .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) { + .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, false) == false) { LOG.info("We cannot convert to SMB because the sort column names do not match."); return false; } if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp - .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) + .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, true) == false) { LOG.info("We cannot convert to SMB because bucket column names do not match."); return false; @@ -428,7 +428,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { int numBuckets = parentOfParent.getOpTraits().getNumBuckets(); // all keys matched. if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(), - tezBucketJoinProcCtx) == false) { + tezBucketJoinProcCtx, true) == false) { LOG.info("No info available to check for bucket map join. Cannot convert"); return false; } @@ -446,7 +446,7 @@ public class ConvertJoinMapJoin implements NodeProcessor { private boolean checkColEquality(List<List<String>> grandParentColNames, List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap, - TezBucketJoinProcCtx tezBucketJoinProcCtx) { + TezBucketJoinProcCtx tezBucketJoinProcCtx, boolean strict) { if ((grandParentColNames == null) || (parentColNames == null)) { return false; @@ -479,7 +479,15 @@ public class ConvertJoinMapJoin implements NodeProcessor { } if (colCount == parentColNames.get(0).size()) { - return true; + if (strict) { + if (colCount == listBucketCols.size()) { + return true; + } else { + return false; + } + } else { + return true; + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index b546838..71c766f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -226,10 +226,6 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { int numBuckets = -1; EdgeType edgeType = EdgeType.BROADCAST_EDGE; if (joinConf.isBucketMapJoin()) { - - // disable auto parallelism for bucket map joins - parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); - numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0]; /* * Here, we can be in one of 4 states. @@ -273,6 +269,10 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { } else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) { edgeType = EdgeType.CUSTOM_SIMPLE_EDGE; } + if (edgeType == EdgeType.CUSTOM_EDGE) { + // disable auto parallelism for bucket map joins + parentRS.getConf().setReducerTraits(EnumSet.of(FIXED)); + } TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets); if (mapJoinWork != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q index 4a7d63e..0f9dd6d 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q @@ -30,6 +30,15 @@ explain select a.key, a.value, b.value from tab a join tab_part b on a.key = b.key; +explain +select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key; + +select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key; + -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table. -- In this case the sub-query is chosen as the big table. explain http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out index 65bded2..34ddc90 100644 --- a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out +++ b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out @@ -183,6 +183,137 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain +select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark +#### A masked pattern was here #### + Vertices: + Map 4 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 _col0 (type: int) + 1 key (type: int) + Local Work: + Map Reduce Local Work + + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) + Reducer 3 <- Reducer 2 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tab_part + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: int), value (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 key (type: int) + input vertices: + 1 Map 4 + Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +242 PREHOOK: query: -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table. -- In this case the sub-query is chosen as the big table. explain http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out index 61c197f..8338672 100644 --- a/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out +++ b/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out @@ -178,6 +178,129 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: explain +select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tab_part + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: key (type: int), value (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: int), _col1 (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map 4 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: int) + sort order: + + Map-reduce partition columns: key (type: int) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int), KEY._col1 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 key (type: int) + input vertices: + 1 Map 4 + Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE + HybridGraceHashJoin: true + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +PREHOOK: Input: default@tab_part +PREHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) +from +(select distinct key, value from tab_part) a join tab b on a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +POSTHOOK: Input: default@tab_part +POSTHOOK: Input: default@tab_part@ds=2008-04-08 +#### A masked pattern was here #### +242 PREHOOK: query: -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table. -- In this case the sub-query is chosen as the big table. explain