HIVE-16423: Add hint to enforce semi join optimization (Deepak Jaiswal, 
reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9d5d737d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9d5d737d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9d5d737d

Branch: refs/heads/master
Commit: 9d5d737db4f715a880f0d544d548a5ce370f602b
Parents: fa24d4b
Author: Gunther Hagleitner <gunt...@apache.org>
Authored: Thu Apr 20 10:07:52 2017 -0700
Committer: Gunther Hagleitner <gunt...@apache.org>
Committed: Thu Apr 20 10:07:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |     2 +
 .../test/resources/testconfiguration.properties |     1 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |     4 +-
 .../DynamicPartitionPruningOptimization.java    |   102 +-
 .../calcite/translator/HiveOpConverter.java     |    24 +-
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |    35 +
 .../hive/ql/parse/CalcitePlanner.java.orig      |  4188 +++++
 .../hadoop/hive/ql/parse/GenTezUtils.java       |    25 +-
 .../apache/hadoop/hive/ql/parse/HintParser.g    |     3 +
 .../hadoop/hive/ql/parse/ParseContext.java      |    25 +-
 .../apache/hadoop/hive/ql/parse/QBJoinTree.java |    16 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |    63 +
 .../hive/ql/parse/SemanticAnalyzer.java.orig    | 13508 +++++++++++++++++
 .../hive/ql/parse/SemiJoinBranchInfo.java       |    45 +
 .../hadoop/hive/ql/parse/SemiJoinHint.java      |    43 +
 .../hadoop/hive/ql/parse/TaskCompiler.java      |     2 +-
 .../hadoop/hive/ql/parse/TezCompiler.java       |   137 +-
 .../hive/ql/plan/ExprNodeDynamicListDesc.java   |    15 +-
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    |    18 +
 .../hive/ql/ppd/SyntheticJoinPredicate.java     |     6 +-
 .../ql/udf/generic/GenericUDAFBloomFilter.java  |    13 +
 .../test/queries/clientpositive/semijoin_hint.q |    54 +
 .../clientpositive/llap/semijoin_hint.q.out     |   899 ++
 23 files changed, 19107 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 420d35e..b10b08e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2892,6 +2892,8 @@ public class HiveConf extends Configuration {
             "Big table for runtime filteting should be of atleast this size"),
     
TEZ_DYNAMIC_SEMIJOIN_REDUCTION_THRESHOLD("hive.tez.dynamic.semijoin.reduction.threshold",
 (float) 0.50,
             "Only perform semijoin optimization if the estimated benefit at or 
above this fraction of the target table"),
+    
TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY("hive.tez.dynamic.semijoin.reduction.hint.only",
 false,
+            "When true, only enforce semijoin when a hint is provided"),
     TEZ_SMB_NUMBER_WAVES(
         "hive.tez.smb.number.waves",
         (float) 0.5,

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index ed5ce9d..116d0eb 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -572,6 +572,7 @@ minillaplocal.query.files=acid_globallimit.q,\
   schema_evol_text_vecrow_table.q,\
   selectDistinctStar.q,\
   semijoin.q,\
+  semijoin_hint.q,\
   smb_cache.q,\
   special_character_in_tabnames_1.q,\
   sqlmerge.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index db6b05b..637bc54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -794,7 +794,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // The semijoin branch can potentially create a task level cycle
       // with the hashjoin except when it is dynamically partitioned hash
       // join which takes place in a separate task.
-      if (context.parseContext.getRsOpToTsOpMap().size() > 0
+      if (context.parseContext.getRsToSemiJoinBranchInfo().size() > 0
               && removeReduceSink) {
         removeCycleCreatingSemiJoinOps(mapJoinOp, parentSelectOpOfBigTableOp,
                 context.parseContext);
@@ -826,7 +826,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
 
       ReduceSinkOperator rs = (ReduceSinkOperator) op;
-      TableScanOperator ts = parseContext.getRsOpToTsOpMap().get(rs);
+      TableScanOperator ts = 
parseContext.getRsToSemiJoinBranchInfo().get(rs).getTsOp();
       if (ts == null) {
         // skip, no semijoin branch
         continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index 838cc69..eb3eba5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -43,12 +43,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
-import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.RuntimeValuesInfo;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.*;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.plan.*;
 import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBloomFilter.GenericUDAFBloomFilterEvaluator;
@@ -215,16 +210,25 @@ public class DynamicPartitionPruningOptimization 
implements NodeProcessor {
         } else {
           LOG.debug("Column " + column + " is not a partition column");
           if (semiJoin && ts.getConf().getFilterExpr() != null) {
-            LOG.debug("Initiate semijoin reduction for " + column);
-            // Get the table name from which the min-max values will come.
+            LOG.debug("Initiate semijoin reduction for " + column + " ("
+                + ts.getConf().getFilterExpr().getExprString());
+            // Get the table name from which the min-max values and bloom 
filter will come.
             Operator<?> op = ctx.generator;
+
             while (!(op == null || op instanceof TableScanOperator)) {
               op = op.getParentOperators().get(0);
             }
             String tableAlias = (op == null ? "" : ((TableScanOperator) 
op).getConf().getAlias());
+
+            Map<String, SemiJoinHint> hints = ctx.desc.getHints();
+            SemiJoinHint sjHint = (hints != null) ? hints.get(tableAlias) : 
null;
             keyBaseAlias = ctx.generator.getOperatorId() + "_" + tableAlias + 
"_" + column;
 
-            semiJoinAttempted = generateSemiJoinOperatorPlan(ctx, 
parseContext, ts, keyBaseAlias);
+            semiJoinAttempted = generateSemiJoinOperatorPlan(
+                ctx, parseContext, ts, keyBaseAlias, sjHint);
+            if (!semiJoinAttempted && sjHint != null) {
+              throw new SemanticException("The user hint to enforce semijoin 
failed required conditions");
+            }
           }
         }
 
@@ -387,7 +391,13 @@ public class DynamicPartitionPruningOptimization 
implements NodeProcessor {
 
   // Generates plan for min/max when dynamic partition pruning is ruled out.
   private boolean generateSemiJoinOperatorPlan(DynamicListContext ctx, 
ParseContext parseContext,
-      TableScanOperator ts, String keyBaseAlias) throws SemanticException {
+      TableScanOperator ts, String keyBaseAlias, SemiJoinHint sjHint) throws 
SemanticException {
+
+    // If semijoin hint is enforced, make sure hint is provided
+    if 
(parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
+            && sjHint == null) {
+        return false;
+    }
 
     // we will put a fork in the plan at the source of the reduce sink
     Operator<? extends OperatorDesc> parentOfRS = 
ctx.generator.getParentOperators().get(0);
@@ -441,6 +451,14 @@ public class DynamicPartitionPruningOptimization 
implements NodeProcessor {
       }
     }
 
+    // If hint is provided and only hinted semijoin optimizations should be
+    // created, then skip other columns on the table
+    if 
(parseContext.getConf().getBoolVar(ConfVars.TEZ_DYNAMIC_SEMIJOIN_REDUCTION_HINT_ONLY)
+            && sjHint.getColName() != null &&
+            !internalColName.equals(sjHint.getColName())) {
+      return false;
+    }
+
     List<ExprNodeDesc> keyExprs = new ArrayList<ExprNodeDesc>();
     keyExprs.add(key);
 
@@ -484,8 +502,6 @@ public class DynamicPartitionPruningOptimization implements 
NodeProcessor {
             HiveConf.getFloatVar(parseContext.getConf(),
                     HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
 
-    ArrayList<ExprNodeDesc> groupByExprs = new ArrayList<ExprNodeDesc>();
-
     // Add min/max and bloom filter aggregations
     List<ObjectInspector> aggFnOIs = new ArrayList<ObjectInspector>();
     aggFnOIs.add(key.getWritableObjectInspector());
@@ -505,8 +521,14 @@ public class DynamicPartitionPruningOptimization 
implements NodeProcessor {
       AggregationDesc bloomFilter = new AggregationDesc("bloom_filter",
               FunctionRegistry.getGenericUDAFEvaluator("bloom_filter", 
aggFnOIs, false, false),
               params, false, Mode.PARTIAL1);
-      GenericUDAFBloomFilterEvaluator bloomFilterEval = 
(GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
+      GenericUDAFBloomFilterEvaluator bloomFilterEval =
+          (GenericUDAFBloomFilterEvaluator) 
bloomFilter.getGenericUDAFEvaluator();
       bloomFilterEval.setSourceOperator(selectOp);
+
+      if (sjHint != null && sjHint.getNumEntries() > 0) {
+        LOG.debug("Setting size for " + keyBaseAlias + " to " + 
sjHint.getNumEntries() + " based on the hint");
+        bloomFilterEval.setHintEntries(sjHint.getNumEntries());
+      }
       
bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
       
bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
       
bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
@@ -607,6 +629,9 @@ public class DynamicPartitionPruningOptimization implements 
NodeProcessor {
               bloomFilterFinalParams, false, Mode.FINAL);
       GenericUDAFBloomFilterEvaluator bloomFilterEval = 
(GenericUDAFBloomFilterEvaluator) bloomFilter.getGenericUDAFEvaluator();
       bloomFilterEval.setSourceOperator(selectOp);
+      if (sjHint != null && sjHint.getNumEntries() > 0) {
+        bloomFilterEval.setHintEntries(sjHint.getNumEntries());
+      }
       
bloomFilterEval.setMaxEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MAX_BLOOM_FILTER_ENTRIES));
       
bloomFilterEval.setMinEntries(parseContext.getConf().getLongVar(ConfVars.TEZ_MIN_BLOOM_FILTER_ENTRIES));
       
bloomFilterEval.setFactor(parseContext.getConf().getFloatVar(ConfVars.TEZ_BLOOM_FILTER_FACTOR));
@@ -635,23 +660,56 @@ public class DynamicPartitionPruningOptimization 
implements NodeProcessor {
       rsOp.getConf().setOutputOperators(outputOperators);
     }
 
+    createFinalRsForSemiJoinOp(parseContext, ts, groupByOpFinal, key,
+            keyBaseAlias, ctx.parent.getChildren().get(0), sjHint != null);
+
+    return true;
+  }
+
+  private void createFinalRsForSemiJoinOp(
+          ParseContext parseContext, TableScanOperator ts, GroupByOperator gb,
+          ExprNodeDesc key, String keyBaseAlias, ExprNodeDesc colExpr,
+          boolean isHint) throws SemanticException {
+    ArrayList<String> gbOutputNames = new ArrayList<>();
+    // One each for min, max and bloom filter
+    gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(0));
+    gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(1));
+    gbOutputNames.add(SemanticAnalyzer.getColumnInternalName(2));
+
+    int colPos = 0;
+    ArrayList<ExprNodeDesc> rsValueCols = new ArrayList<ExprNodeDesc>();
+    for (int i = 0; i < gbOutputNames.size() - 1; i++) {
+      ExprNodeColumnDesc expr = new ExprNodeColumnDesc(key.getTypeInfo(),
+              gbOutputNames.get(colPos++), "", false);
+      rsValueCols.add(expr);
+    }
+
+    // Bloom Filter uses binary
+    ExprNodeColumnDesc colBFExpr = new 
ExprNodeColumnDesc(TypeInfoFactory.binaryTypeInfo,
+            gbOutputNames.get(colPos++), "", false);
+    rsValueCols.add(colBFExpr);
+
     // Create the final Reduce Sink Operator
     ReduceSinkDesc rsDescFinal = PlanUtils.getReduceSinkDesc(
             new ArrayList<ExprNodeDesc>(), rsValueCols, gbOutputNames, false,
             -1, 0, 1, Operation.NOT_ACID);
     ReduceSinkOperator rsOpFinal = 
(ReduceSinkOperator)OperatorFactory.getAndMakeChild(
-            rsDescFinal, new RowSchema(groupByOpFinal.getSchema()), 
groupByOpFinal);
+            rsDescFinal, new RowSchema(gb.getSchema()), gb);
+    Map<String, ExprNodeDesc> columnExprMap = new HashMap<>();
     rsOpFinal.setColumnExprMap(columnExprMap);
 
-    LOG.debug("DynamicMinMaxPushdown: Saving RS to TS mapping: " + rsOpFinal + 
": " + ts);
-    parseContext.getRsOpToTsOpMap().put(rsOpFinal, ts);
+    LOG.debug("DynamicSemiJoinPushdown: Saving RS to TS mapping: " + rsOpFinal 
+ ": " + ts);
+    SemiJoinBranchInfo sjInfo = new SemiJoinBranchInfo(ts, isHint);
+    parseContext.getRsToSemiJoinBranchInfo().put(rsOpFinal, sjInfo);
 
     // for explain purpose
-    if (parseContext.getContext().getExplainConfig() != null
-        && parseContext.getContext().getExplainConfig().isFormatted()) {
-      List<String> outputOperators = new ArrayList<>();
+    if (parseContext.getContext().getExplainConfig() != null &&
+            parseContext.getContext().getExplainConfig().isFormatted()) {
+      List<String> outputOperators = rsOpFinal.getConf().getOutputOperators();
+      if (outputOperators == null) {
+        outputOperators = new ArrayList<>();
+      }
       outputOperators.add(ts.getOperatorId());
-      rsOpFinal.getConf().setOutputOperators(outputOperators);
     }
 
     // Save the info that is required at query time to resolve dynamic/runtime 
values.
@@ -666,10 +724,8 @@ public class DynamicPartitionPruningOptimization 
implements NodeProcessor {
     runtimeValuesInfo.setTableDesc(rsFinalTableDesc);
     runtimeValuesInfo.setDynamicValueIDs(dynamicValueIDs);
     runtimeValuesInfo.setColExprs(rsValueCols);
-    runtimeValuesInfo.setTsColExpr(ctx.parent.getChildren().get(0));
+    runtimeValuesInfo.setTsColExpr(colExpr);
     parseContext.getRsToRuntimeValuesInfoMap().put(rsOpFinal, 
runtimeValuesInfo);
-
-    return true;
   }
 
   private Map<Node, Object> collectDynamicPruningConditions(ExprNodeDesc pred, 
NodeProcessorCtx ctx)

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
index 73a9b0f..d375d1b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.optimizer.calcite.translator;
 
 
+import org.apache.hadoop.hive.ql.parse.*;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -72,19 +74,8 @@ import 
org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortExchange
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan;
 import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion;
-import org.apache.hadoop.hive.ql.parse.JoinCond;
-import org.apache.hadoop.hive.ql.parse.JoinType;
-import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
-import org.apache.hadoop.hive.ql.parse.PTFTranslator;
-import org.apache.hadoop.hive.ql.parse.ParseUtils;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.UnparseTranslator;
-import org.apache.hadoop.hive.ql.parse.WindowingComponentizer;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec;
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -348,6 +339,9 @@ public class HiveOpConverter {
     // through Hive
     String[] baseSrc = new String[joinRel.getInputs().size()];
     String tabAlias = getHiveDerivedTableAlias();
+    Map<String, SemiJoinHint> semiJoinHints = 
semanticAnalyzer.parseSemiJoinHint(
+        semanticAnalyzer.getQB().getParseInfo().getHints());
+
     // 1. Convert inputs
     OpAttr[] inputs = new OpAttr[joinRel.getInputs().size()];
     List<Operator<?>> children = new 
ArrayList<Operator<?>>(joinRel.getInputs().size());
@@ -413,7 +407,7 @@ public class HiveOpConverter {
 
     // 6. Generate Join operator
     JoinOperator joinOp = genJoin(joinRel, joinExpressions, filterExpressions, 
children,
-            baseSrc, tabAlias);
+            baseSrc, tabAlias, semiJoinHints);
 
     // 7. Return result
     return new OpAttr(tabAlias, newVcolsInCalcite, joinOp);
@@ -726,7 +720,7 @@ public class HiveOpConverter {
       List<String> keepColNames) throws SemanticException {
     // 1. Generate RS operator
     // 1.1 Prune the tableNames, only count the tableNames that are not empty 
strings
-       // as empty string in table aliases is only allowed for virtual columns.
+  // as empty string in table aliases is only allowed for virtual columns.
     String tableAlias = null;
     Set<String> tableNames = input.getSchema().getTableNames();
     for (String tableName : tableNames) {
@@ -885,7 +879,8 @@ public class HiveOpConverter {
 
   private static JoinOperator genJoin(RelNode join, ExprNodeDesc[][] 
joinExpressions,
       List<List<ExprNodeDesc>> filterExpressions, List<Operator<?>> children,
-      String[] baseSrc, String tabAlias) throws SemanticException {
+      String[] baseSrc, String tabAlias, Map<String, SemiJoinHint> 
semiJoinHints)
+          throws SemanticException {
 
     // 1. Extract join type
     JoinCondDesc[] joinCondns;
@@ -1011,6 +1006,7 @@ public class HiveOpConverter {
     // 4. We create the join operator with its descriptor
     JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, 
joinCondns,
             filters, joinExpressions);
+    desc.setSemiJoinHints(semiJoinHints);
     desc.setReversedExprs(reversedExprs);
     desc.setFilterMap(filterMap);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index c97b3e7..d10b6bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import org.antlr.runtime.tree.Tree;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
@@ -332,6 +334,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
       skipCalcitePlan = true;
     } else {
       PreCboCtx cboCtx = (PreCboCtx) plannerCtx;
+      ASTNode oldHints = getQB().getParseInfo().getHints();
 
       // Note: for now, we don't actually pass the queryForCbo to CBO, because
       // it accepts qb, not AST, and can also access all the private stuff in
@@ -399,6 +402,15 @@ public class CalcitePlanner extends SemanticAnalyzer {
                 newAST = reAnalyzeCTASAfterCbo(newAST);
               }
             }
+            if (oldHints != null) {
+              if (getQB().getParseInfo().getHints() != null) {
+                LOG.warn("Hints are not null in the optimized tree; before CBO 
" + oldHints.dump()
+                    + "; after CBO " + 
getQB().getParseInfo().getHints().dump());
+              } else {
+                LOG.debug("Propagating hints to QB: " + oldHints);
+                getQB().getParseInfo().setHints(oldHints);
+              }
+            }
             Phase1Ctx ctx_1 = initPhase1Ctx();
             if (!doPhase1(newAST, getQB(), ctx_1, null)) {
               throw new RuntimeException("Couldn't do phase1 on CBO optimized 
query plan");
@@ -3462,6 +3474,24 @@ public class CalcitePlanner extends SemanticAnalyzer {
       return selRel;
     }
 
+    private void setQueryHints(QB qb) throws SemanticException {
+      QBParseInfo qbp = getQBParseInfo(qb);
+      String selClauseName = qbp.getClauseNames().iterator().next();
+      Tree selExpr0 = qbp.getSelForClause(selClauseName).getChild(0);
+
+      if (selExpr0.getType() != HiveParser.QUERY_HINT) return;
+      String hint = ctx.getTokenRewriteStream().toString(
+          selExpr0.getTokenStartIndex(), selExpr0.getTokenStopIndex());
+      LOG.debug("Handling query hints: " + hint);
+      ParseDriver pd = new ParseDriver();
+      try {
+        ASTNode hintNode = pd.parseHint(hint);
+        qbp.setHints((ASTNode) hintNode);
+      } catch (ParseException e) {
+        throw new SemanticException("failed to parse query hint: 
"+e.getMessage(), e);
+      }
+    }
+
     /**
      * NOTE: there can only be one select caluse since we don't handle multi
      * destination insert.
@@ -3960,7 +3990,12 @@ public class CalcitePlanner extends SemanticAnalyzer {
         throw new CalciteSemanticException("Unsupported", 
UnsupportedFeature.Others);
 
       }
+
       // 1.3 process join
+      // 1.3.1 process hints
+      setQueryHints(qb);
+
+      // 1.3.2 process the actual join
       if (qb.getParseInfo().getJoinExpr() != null) {
         srcRel = genJoinLogicalPlan(qb.getParseInfo().getJoinExpr(), 
aliasToRel);
       } else {

Reply via email to