Repository: hive Updated Branches: refs/heads/master dd04a92f7 -> b67d52c29
HIVE-17276: Check max shuffle size when converting to dynamically partitioned hash join (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b67d52c2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b67d52c2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b67d52c2 Branch: refs/heads/master Commit: b67d52c294c1c2db21ed89c86486c946aa5d3ca4 Parents: dd04a92 Author: Jesus Camacho Rodriguez <jcama...@apache.org> Authored: Tue Aug 29 10:36:29 2017 -0700 Committer: Jesus Camacho Rodriguez <jcama...@apache.org> Committed: Wed Aug 30 08:29:33 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 55 +++++-- .../queries/clientpositive/join_max_hashtable.q | 13 ++ .../llap/join_max_hashtable.q.out | 154 +++++++++++++++++++ 4 files changed, 217 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 0d8d7ae..e4b09a2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1435,9 +1435,14 @@ public class HiveConf extends Configuration { HIVECONVERTJOINMAXENTRIESHASHTABLE("hive.auto.convert.join.hashtable.max.entries", 40000000L, "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" + - "However, if it is on, and the predicated number of entries in hashtable for a given join \n" + + "However, if it is on, and the predicted number of entries in hashtable for a given join \n" + "input is larger than this number, the join will not be converted to a mapjoin. \n" + "The value \"-1\" means no limit."), + HIVECONVERTJOINMAXSHUFFLESIZE("hive.auto.convert.join.shuffle.max.size", 10000000L, + "If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. \n" + + "However, if it is on, and the predicted size of the larger input for a given join is greater \n" + + "than this number, the join will not be converted to a dynamically partitioned hash join. \n" + + "The value \"-1\" means no limit."), HIVEHASHTABLEKEYCOUNTADJUSTMENT("hive.hashtable.key.count.adjustment", 1.0f, "Adjustment to mapjoin hashtable size derived from table and column statistics; the estimate" + " of the number of keys is divided by this value. If the value is 0, statistics are not used" + http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 21d0053..a2414f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -579,12 +579,12 @@ public class ConvertJoinMapJoin implements NodeProcessor { * for Dynamic Hash Join conversion consideration * @param skipJoinTypeChecks whether to skip join type checking * @param maxSize size threshold for Map Join conversion - * @param checkHashTableEntries whether to check threshold for distinct keys in hash table for Map Join + * @param checkMapJoinThresholds whether to check thresholds to convert to Map Join * @return returns big table position or -1 if it cannot be determined * @throws SemanticException */ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, - int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkHashTableEntries) + int buckets, boolean skipJoinTypeChecks, long maxSize, boolean checkMapJoinThresholds) throws SemanticException { if (!skipJoinTypeChecks) { /* @@ -634,6 +634,9 @@ public class ConvertJoinMapJoin implements NodeProcessor { // total size of the inputs long totalSize = 0; + // convert to DPHJ + boolean convertDPHJ = false; + for (int pos = 0; pos < joinOp.getParentOperators().size(); pos++) { Operator<? extends OperatorDesc> parentOp = joinOp.getParentOperators().get(pos); @@ -693,19 +696,19 @@ public class ConvertJoinMapJoin implements NodeProcessor { // We are replacing the current big table with a new one, thus // we need to count the current one as a map table then. totalSize += bigInputStat.getDataSize(); - // Check if number of distinct keys is larger than given max - // number of entries for HashMap. If it is, we do not convert. - if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) { - return -1; + // Check if number of distinct keys is greater than given max number of entries + // for HashMap + if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) { + convertDPHJ = true; } } else if (!selectedBigTable) { // This is not the first table and we are not using it as big table, // in fact, we're adding this table as a map table totalSize += inputSize; - // Check if number of distinct keys is larger than given max - // number of entries for HashMap. If it is, we do not convert. - if (checkHashTableEntries && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) { - return -1; + // Check if number of distinct keys is greater than given max number of entries + // for HashMap + if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) { + convertDPHJ = true; } } @@ -723,6 +726,13 @@ public class ConvertJoinMapJoin implements NodeProcessor { } + // Check if size of data to shuffle (larger table) is less than given max size + if (checkMapJoinThresholds && convertDPHJ + && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) { + LOG.debug("Conditions to convert to MapJoin are not met"); + return -1; + } + // We store the total memory that this MapJoin is going to use, // which is calculated as totalSize/buckets, with totalSize // equal to sum of small tables size. @@ -1087,13 +1097,36 @@ public class ConvertJoinMapJoin implements NodeProcessor { if (estimation > max) { // Estimation larger than max LOG.debug("Number of different entries for HashTable is greater than the max; " - + "we do not converting to MapJoin"); + + "we do not convert to MapJoin"); return false; } // We can proceed with the conversion return true; } + /* Returns true if it passes the test, false otherwise. */ + private boolean checkShuffleSizeForLargeTable(JoinOperator joinOp, int position, + OptimizeTezProcContext context) { + long max = HiveConf.getLongVar(context.parseContext.getConf(), + HiveConf.ConfVars.HIVECONVERTJOINMAXSHUFFLESIZE); + if (max < 1) { + // Max is disabled, we can safely return true + return true; + } + // Evaluate + ReduceSinkOperator rsOp = (ReduceSinkOperator) joinOp.getParentOperators().get(position); + Statistics inputStats = rsOp.getStatistics(); + long inputSize = inputStats.getDataSize(); + LOG.debug("Estimated size for input {}: {}; Max size for DPHJ conversion: {}", + position, inputSize, max); + if (inputSize > max) { + LOG.debug("Size of input is greater than the max; " + + "we do not convert to DPHJ"); + return false; + } + return true; + } + private static long estimateNDV(long numRows, List<ColStatistics> columnStats) { // If there is a single column, return the number of distinct values if (columnStats.size() == 1) { http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/test/queries/clientpositive/join_max_hashtable.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/join_max_hashtable.q b/ql/src/test/queries/clientpositive/join_max_hashtable.q index 9c30a0d..8d0ccb7 100644 --- a/ql/src/test/queries/clientpositive/join_max_hashtable.q +++ b/ql/src/test/queries/clientpositive/join_max_hashtable.q @@ -1,6 +1,7 @@ set hive.auto.convert.join=true; set hive.optimize.dynamic.partition.hashjoin=true; set hive.auto.convert.join.hashtable.max.entries=500; +set hive.auto.convert.join.shuffle.max.size=100000; -- CONVERT EXPLAIN @@ -35,3 +36,15 @@ FROM src x JOIN src y ON (x.key = y.key); EXPLAIN SELECT x.key, x.value FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value); + +set hive.auto.convert.join.shuffle.max.size=80000; + +-- CONVERT +EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key); + +-- CONVERT +EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value); http://git-wip-us.apache.org/repos/asf/hive/blob/b67d52c2/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out index 63f5d28..ef1a6f3 100644 --- a/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out +++ b/ql/src/test/results/clientpositive/llap/join_max_hashtable.q.out @@ -498,3 +498,157 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: x + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1 + input vertices: + 1 Map 2 + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: y + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT x.key, x.value +FROM src x JOIN src y ON (x.key = y.key AND x.value = y.value) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (BROADCAST_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: x + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string), _col1 (type: string) + 1 _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + input vertices: + 1 Map 2 + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + LLAP IO: no inputs + Map 2 + Map Operator Tree: + TableScan + alias: y + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (key is not null and value is not null) (type: boolean) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: string) + sort order: ++ + Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: llap + LLAP IO: no inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +