http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index d58f447..83e89af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -266,11 +266,14 @@ public class GenTezUtils { } } // This TableScanOperator could be part of semijoin optimization. - Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap = - context.parseContext.getRsOpToTsOpMap(); - for (ReduceSinkOperator rs : rsOpToTsOpMap.keySet()) { - if (rsOpToTsOpMap.get(rs) == orig) { - rsOpToTsOpMap.put(rs, (TableScanOperator) newRoot); + Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo = + context.parseContext.getRsToSemiJoinBranchInfo(); + for (ReduceSinkOperator rs : rsToSemiJoinBranchInfo.keySet()) { + SemiJoinBranchInfo sjInfo = rsToSemiJoinBranchInfo.get(rs); + if (sjInfo.getTsOp() == orig) { + SemiJoinBranchInfo newSJInfo = new SemiJoinBranchInfo( + (TableScanOperator)newRoot, sjInfo.getIsHint()); + rsToSemiJoinBranchInfo.put(rs, newSJInfo); } } } @@ -516,19 +519,18 @@ public class GenTezUtils { return EdgeType.SIMPLE_EDGE; } - public static void processDynamicMinMaxPushDownOperator( + public static void processDynamicSemiJoinPushDownOperator( GenTezProcContext procCtx, RuntimeValuesInfo runtimeValuesInfo, ReduceSinkOperator rs) throws SemanticException { - TableScanOperator ts = procCtx.parseContext.getRsOpToTsOpMap().get(rs); + SemiJoinBranchInfo sjInfo = procCtx.parseContext.getRsToSemiJoinBranchInfo().get(rs); List<BaseWork> rsWorkList = procCtx.childToWorkMap.get(rs); - if (ts == null || rsWorkList == null) { + if (sjInfo == null || rsWorkList == null) { // This happens when the ReduceSink's edge has been removed by cycle // detection logic. Nothing to do here. return; } - LOG.debug("ResduceSink " + rs + " to TableScan " + ts); if (rsWorkList.size() != 1) { StringBuilder sb = new StringBuilder(); @@ -541,6 +543,9 @@ public class GenTezUtils { throw new SemanticException(rs + " belongs to multiple BaseWorks: " + sb.toString()); } + TableScanOperator ts = sjInfo.getTsOp(); + LOG.debug("ResduceSink " + rs + " to TableScan " + ts); + BaseWork parentWork = rsWorkList.get(0); BaseWork childWork = procCtx.rootToWorkMap.get(ts); @@ -611,7 +616,7 @@ public class GenTezUtils { skip = true; } } - context.getRsOpToTsOpMap().remove(rs); + context.getRsToSemiJoinBranchInfo().remove(rs); } private static class DynamicValuePredicateContext implements NodeProcessorCtx {
http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g index 8e70a46..e110fb3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HintParser.g @@ -31,6 +31,7 @@ tokens { TOK_MAPJOIN; TOK_STREAMTABLE; TOK_HINTARGLIST; + TOK_LEFTSEMIJOIN; } @header { @@ -69,6 +70,7 @@ hintItem hintName : KW_MAPJOIN -> TOK_MAPJOIN + | KW_SEMI -> TOK_LEFTSEMIJOIN | KW_STREAMTABLE -> TOK_STREAMTABLE ; @@ -80,4 +82,5 @@ hintArgs hintArgName : Identifier + | Number ; http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index 3f9f76c..9a69f90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -33,17 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.SelectOperator; -import org.apache.hadoop.hive.ql.exec.TableScanOperator; -import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Table; @@ -126,11 +116,10 @@ public class ParseContext { private boolean needViewColumnAuthorization; private Set<FileSinkDesc> acidFileSinks = Collections.emptySet(); - // Map to store mapping between reduce sink Operator and TS Operator for semijoin - private Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap = - new HashMap<ReduceSinkOperator, TableScanOperator>(); private Map<ReduceSinkOperator, RuntimeValuesInfo> rsToRuntimeValuesInfo = new HashMap<ReduceSinkOperator, RuntimeValuesInfo>(); + private Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo = + new HashMap<>(); public ParseContext() { } @@ -666,11 +655,11 @@ public class ParseContext { return rsToRuntimeValuesInfo; } - public void setRsOpToTsOpMap(Map<ReduceSinkOperator, TableScanOperator> rsOpToTsOpMap) { - this.rsOpToTsOpMap = rsOpToTsOpMap; + public void setRsToSemiJoinBranchInfo(Map<ReduceSinkOperator, SemiJoinBranchInfo> rsToSemiJoinBranchInfo) { + this.rsToSemiJoinBranchInfo = rsToSemiJoinBranchInfo; } - public Map<ReduceSinkOperator, TableScanOperator> getRsOpToTsOpMap() { - return rsOpToTsOpMap; + public Map<ReduceSinkOperator, SemiJoinBranchInfo> getRsToSemiJoinBranchInfo() { + return rsToSemiJoinBranchInfo; } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java index ec76fb7..bcef252 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Arrays; + import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -82,6 +84,7 @@ public class QBJoinTree implements Serializable, Cloneable { * We then add a Filter Operator after the Join Operator for this QBJoinTree. */ private final List<ASTNode> postJoinFilters; + private Map<String, SemiJoinHint> semiJoinHint; /** * constructor. @@ -429,4 +432,17 @@ public class QBJoinTree implements Serializable, Cloneable { return cloned; } + + public void setSemiJoinHint(Map<String, SemiJoinHint> semiJoinHint) { + this.semiJoinHint = semiJoinHint; + } + + public Map<String, SemiJoinHint> getSemiJoinHint() { + return semiJoinHint; + } + + @Override + public String toString() { + return "QBJoinTree [leftAlias=" + leftAlias + ", rightAliases=" + Arrays.toString(rightAliases) + ", leftAliases=" + Arrays.toString(leftAliases) + ", semiJoinHint=" + semiJoinHint + "]"; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index b5a5645..e4ca25b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8122,6 +8122,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, join.getNoOuterJoin(), joinCondns, filterMap, joinKeys); + desc.setSemiJoinHints(join.getSemiJoinHint()); desc.setReversedExprs(reversedExprs); desc.setFilterMap(join.getFilterMap()); // For outer joins, add filters that apply to more than one input @@ -8669,6 +8670,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { parseStreamTables(joinTree, qb); } + if (qb.getParseInfo().getHints() != null) { + // TODO: do we need this for unique join? + joinTree.setSemiJoinHint(parseSemiJoinHint(qb.getParseInfo().getHints())); + } return joinTree; } @@ -8967,6 +8972,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if ((conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) == false) { parseStreamTables(joinTree, qb); } + + joinTree.setSemiJoinHint(parseSemiJoinHint(qb.getParseInfo().getHints())); } return joinTree; @@ -9014,6 +9021,62 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { joinTree.setStreamAliases(streamAliases); } + /** Parses semjoin hints in the query and returns the table names mapped to filter size, or -1 if not specified. + * Hints can be in 3 formats + * 1. TableName, ColumnName, bloom filter entries + * 2. TableName, bloom filter entries, and + * 3. TableName, ColumnName + * */ + public Map<String, SemiJoinHint> parseSemiJoinHint(ASTNode hints) throws SemanticException { + if (hints == null) return null; + Map<String, SemiJoinHint> result = null; + for (Node hintNode : hints.getChildren()) { + ASTNode hint = (ASTNode) hintNode; + if (hint.getChild(0).getType() != HintParser.TOK_LEFTSEMIJOIN) continue; + if (result == null) { + result = new HashMap<>(); + } + String alias = null; + String colName = null; + Tree args = hint.getChild(1); + for (int i = 0; i < args.getChildCount(); i++) { + // We can have table names, column names or sizes here (or incorrect hint if the user is so inclined). + String text = args.getChild(i).getText(); + Integer number = null; + try { + number = Integer.parseInt(text); + } catch (NumberFormatException ex) { // Ignore. + } + if (number != null) { + if (alias == null) { + throw new SemanticException("Invalid semijoin hint - arg " + i + " (" + + text + ") is a number but the previous one is not an alias"); + } + SemiJoinHint sjHint = new SemiJoinHint(alias, colName, number); + result.put(alias, sjHint); + alias = null; + colName = null; + } else { + if (alias == null) { + alias = text; + } else if (colName == null ){ + colName = text; + } else { + // No bloom filter entries provided. + SemiJoinHint sjHint = new SemiJoinHint(alias, colName, null); + result.put(alias, sjHint); + alias = text; + colName = null; + } + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Semijoin hint parsed: " + result); + } + return result; + } + /** * Merges node to target */