HIVE-11605: Incorrect results with bucket map join in tez. (Vikram Dixit K, 
reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ea8e296
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ea8e296
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ea8e296

Branch: refs/heads/llap
Commit: 4ea8e29619eb0bbb02e3f7c09ffc9d44bf4cdfef
Parents: 594e25a
Author: vikram <vik...@hortonworks.com>
Authored: Thu Sep 10 13:13:56 2015 -0700
Committer: vikram <vik...@hortonworks.com>
Committed: Thu Sep 10 13:30:23 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  18 ++-
 .../ql/optimizer/ReduceSinkMapJoinProc.java     |   8 +-
 .../clientpositive/bucket_map_join_tez1.q       |   9 ++
 .../spark/bucket_map_join_tez1.q.out            | 131 +++++++++++++++++++
 .../tez/bucket_map_join_tez1.q.out              | 123 +++++++++++++++++
 5 files changed, 280 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index e3acdfc..8ea1879 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -375,13 +375,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
       ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
       if 
(checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), 
rsOp
-          .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), 
tezBucketJoinProcCtx) == false) {
+          .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), 
tezBucketJoinProcCtx, false) == false) {
         LOG.info("We cannot convert to SMB because the sort column names do 
not match.");
         return false;
       }
 
       if 
(checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(),
 rsOp
-          .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), 
tezBucketJoinProcCtx)
+          .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), 
tezBucketJoinProcCtx, true)
           == false) {
         LOG.info("We cannot convert to SMB because bucket column names do not 
match.");
         return false;
@@ -428,7 +428,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
     // all keys matched.
     if (checkColEquality(grandParentColNames, parentColNames, 
rs.getColumnExprMap(),
-        tezBucketJoinProcCtx) == false) {
+        tezBucketJoinProcCtx, true) == false) {
       LOG.info("No info available to check for bucket map join. Cannot 
convert");
       return false;
     }
@@ -446,7 +446,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
   private boolean checkColEquality(List<List<String>> grandParentColNames,
       List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+      TezBucketJoinProcCtx tezBucketJoinProcCtx, boolean strict) {
 
     if ((grandParentColNames == null) || (parentColNames == null)) {
       return false;
@@ -479,7 +479,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
           }
 
           if (colCount == parentColNames.get(0).size()) {
-            return true;
+            if (strict) {
+              if (colCount == listBucketCols.size()) {
+                return true;
+              } else {
+                return false;
+              }
+            } else {
+              return true;
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index b546838..71c766f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -226,10 +226,6 @@ public class ReduceSinkMapJoinProc implements 
NodeProcessor {
     int numBuckets = -1;
     EdgeType edgeType = EdgeType.BROADCAST_EDGE;
     if (joinConf.isBucketMapJoin()) {
-
-      // disable auto parallelism for bucket map joins
-      parentRS.getConf().setReducerTraits(EnumSet.of(FIXED));
-
       numBuckets = (Integer) 
joinConf.getBigTableBucketNumMapping().values().toArray()[0];
       /*
        * Here, we can be in one of 4 states.
@@ -273,6 +269,10 @@ public class ReduceSinkMapJoinProc implements 
NodeProcessor {
     } else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
       edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
     }
+    if (edgeType == EdgeType.CUSTOM_EDGE) {
+      // disable auto parallelism for bucket map joins
+      parentRS.getConf().setReducerTraits(EnumSet.of(FIXED));
+    }
     TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets);
 
     if (mapJoinWork != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q 
b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
index 4a7d63e..0f9dd6d 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
@@ -30,6 +30,15 @@ explain
 select a.key, a.value, b.value
 from tab a join tab_part b on a.key = b.key;
 
+explain
+select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key;
+
+select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key;
+
 -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed 
table.
 -- In this case the sub-query is chosen as the big table.
 explain

http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out 
b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
index 65bded2..34ddc90 100644
--- a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
@@ -183,6 +183,137 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: explain
+select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+                    Spark HashTable Sink Operator
+                      keys:
+                        0 _col0 (type: int)
+                        1 key (type: int)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 3 <- Reducer 2 (GROUP, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: tab_part
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: 
COMPLETE Column stats: NONE
+                    Group By Operator
+                      keys: key (type: int), value (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: 
COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: 
string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: int), _col1 
(type: string)
+                        Statistics: Num rows: 250 Data size: 2656 Basic stats: 
COMPLETE Column stats: NONE
+        Reducer 2 
+            Local Work:
+              Map Reduce Local Work
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 125 Data size: 1328 Basic stats: 
COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: int)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 125 Data size: 1328 Basic stats: 
COMPLETE Column stats: NONE
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 _col0 (type: int)
+                      1 key (type: int)
+                    input vertices:
+                      1 Map 4
+                    Statistics: Num rows: 137 Data size: 1460 Basic stats: 
COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: count()
+                      mode: hash
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        sort order: 
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+242
 PREHOOK: query: -- one side is really bucketed. srcbucket_mapjoin is not 
really a bucketed table.
 -- In this case the sub-query is chosen as the big table.
 explain

http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out 
b/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
index 61c197f..8338672 100644
--- a/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
@@ -178,6 +178,129 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: explain
+select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: tab_part
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: 
COMPLETE Column stats: NONE
+                    Group By Operator
+                      keys: key (type: int), value (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: 
COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: 
string)
+                        sort order: ++
+                        Map-reduce partition columns: _col0 (type: int), _col1 
(type: string)
+                        Statistics: Num rows: 250 Data size: 2656 Basic stats: 
COMPLETE Column stats: NONE
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 121 Data size: 1283 Basic stats: 
COMPLETE Column stats: NONE
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: int), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 125 Data size: 1328 Basic stats: 
COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col0 (type: int)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 125 Data size: 1328 Basic stats: 
COMPLETE Column stats: NONE
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 _col0 (type: int)
+                      1 key (type: int)
+                    input vertices:
+                      1 Map 4
+                    Statistics: Num rows: 137 Data size: 1460 Basic stats: 
COMPLETE Column stats: NONE
+                    HybridGraceHashJoin: true
+                    Group By Operator
+                      aggregations: count()
+                      mode: hash
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        sort order: 
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: 
COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*)
+from 
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+242
 PREHOOK: query: -- one side is really bucketed. srcbucket_mapjoin is not 
really a bucketed table.
 -- In this case the sub-query is chosen as the big table.
 explain

Reply via email to