http://git-wip-us.apache.org/repos/asf/hive/blob/9d5d737d/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java.orig ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java.orig b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java.orig new file mode 100644 index 0000000..c97b3e7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java.orig @@ -0,0 +1,4188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.math.BigDecimal; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.antlr.runtime.ClassicToken; +import org.antlr.runtime.CommonToken; +import org.antlr.runtime.tree.TreeVisitor; +import org.antlr.runtime.tree.TreeVisitorAction; +import org.apache.calcite.adapter.druid.DruidQuery; +import org.apache.calcite.adapter.druid.DruidRules; +import org.apache.calcite.adapter.druid.DruidSchema; +import org.apache.calcite.adapter.druid.DruidTable; +import org.apache.calcite.adapter.druid.LocalInterval; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptMaterialization; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptSchema; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgram; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationImpl; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.SetOp; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.CachingRelMetadataProvider; +import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.rules.FilterMergeRule; +import org.apache.calcite.rel.rules.JoinToMultiJoinRule; +import org.apache.calcite.rel.rules.LoptOptimizeJoinRule; +import org.apache.calcite.rel.rules.ProjectMergeRule; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.rules.SemiJoinFilterTransposeRule; +import org.apache.calcite.rel.rules.SemiJoinJoinTransposeRule; +import org.apache.calcite.rel.rules.SemiJoinProjectTransposeRule; +import org.apache.calcite.rel.rules.UnionMergeRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeImpl; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexExecutor; +import org.apache.calcite.rex.RexFieldCollation; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlWindow; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.util.CompositeList; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryProperties; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSubquerySemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; +import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteViewSemanticException; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveCalciteUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveDefaultRelMetadataProvider; +import org.apache.hadoop.hive.ql.optimizer.calcite.HivePlannerContext; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRexExecutorImpl; +import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl; +import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; +import org.apache.hadoop.hive.ql.optimizer.calcite.TraitsUtil; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveAlgorithmsConf; +import org.apache.hadoop.hive.ql.optimizer.calcite.cost.HiveVolcanoPlanner; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveAggregate; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveExcept; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveGroupingID; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveIntersect; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveProject; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveRelNode; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSemiJoin; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveSortLimit; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableFunctionScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; +import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveUnion; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregateJoinTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregateProjectMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveAggregatePullUpConstantsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveExceptRewriteRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveExpandDistinctAggregatesRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterAggregateTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterJoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTSTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterProjectTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSetOpTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveFilterSortTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveInsertExchange4JoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveIntersectMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveIntersectRewriteRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinAddNotNullRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinCommuteRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinProjectTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinPushTransitivePredicatesRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveJoinToMultiJoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePartitionPruneRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePointLookupOptimizerRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HivePreFilteringRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectFilterPullUpConstantsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectOverIntersectRemoveRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveProjectSortTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveReduceExpressionsWithStatsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelDecorrelator; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRelFieldTrimmer; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveRulesRegistry; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSemiJoinRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortJoinReduceRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortLimitPullUpConstantsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortMergeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortProjectTransposeRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortRemoveRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSortUnionReduceRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveSubQueryRemoveRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveUnionPullUpConstantsRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.HiveWindowingFixRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewFilterScanRule; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTBuilder; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.ASTConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinCondTypeCheckProcFactory; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.JoinTypeCheckCtx; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.PlanModifierForReturnPath; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.RexNodeConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.SqlFunctionConverter; +import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter; +import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; +import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec; +import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression; +import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec; +import org.apache.hadoop.hive.ql.parse.QBExpr.Opcode; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec; +import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; +import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import com.google.common.base.Function; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.calcite.config.CalciteConnectionConfig; + +public class CalcitePlanner extends SemanticAnalyzer { + + private final AtomicInteger noColsMissingStats = new AtomicInteger(0); + private SemanticException semanticException; + private boolean runCBO = true; + private boolean disableSemJoinReordering = true; + private EnumSet<ExtendedCBOProfile> profilesCBO; + + public CalcitePlanner(QueryState queryState) throws SemanticException { + super(queryState); + if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) { + runCBO = false; + disableSemJoinReordering = false; + } + } + + public void resetCalciteConfiguration() { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) { + runCBO = true; + disableSemJoinReordering = true; + } + } + + @Override + @SuppressWarnings("nls") + public void analyzeInternal(ASTNode ast) throws SemanticException { + if (runCBO) { + PreCboCtx cboCtx = new PreCboCtx(); + super.analyzeInternal(ast, cboCtx); + } else { + super.analyzeInternal(ast); + } + } + + /** + * This method is useful if we want to obtain the logical plan after being parsed and + * optimized by Calcite. + * + * @return the Calcite plan for the query, null if it could not be generated + */ + public RelNode genLogicalPlan(ASTNode ast) throws SemanticException { + LOG.info("Starting generating logical plan"); + PreCboCtx cboCtx = new PreCboCtx(); + //change the location of position alias process here + processPositionAlias(ast); + if (!genResolvedParseTree(ast, cboCtx)) { + return null; + } + ASTNode queryForCbo = ast; + if (cboCtx.type == PreCboCtx.Type.CTAS || cboCtx.type == PreCboCtx.Type.VIEW) { + queryForCbo = cboCtx.nodeOfInterest; // nodeOfInterest is the query + } + runCBO = canCBOHandleAst(queryForCbo, getQB(), cboCtx); + if (!runCBO) { + return null; + } + profilesCBO = obtainCBOProfiles(queryProperties); + disableJoinMerge = true; + final RelNode resPlan = logicalPlan(); + LOG.info("Finished generating logical plan"); + return resPlan; + } + + @Override + @SuppressWarnings("rawtypes") + Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticException { + Operator sinkOp = null; + boolean skipCalcitePlan = false; + + if (!runCBO) { + skipCalcitePlan = true; + } else { + PreCboCtx cboCtx = (PreCboCtx) plannerCtx; + + // Note: for now, we don't actually pass the queryForCbo to CBO, because + // it accepts qb, not AST, and can also access all the private stuff in + // SA. We rely on the fact that CBO ignores the unknown tokens (create + // table, destination), so if the query is otherwise ok, it is as if we + // did remove those and gave CBO the proper AST. That is kinda hacky. + ASTNode queryForCbo = ast; + if (cboCtx.type == PreCboCtx.Type.CTAS || cboCtx.type == PreCboCtx.Type.VIEW) { + queryForCbo = cboCtx.nodeOfInterest; // nodeOfInterest is the query + } + runCBO = canCBOHandleAst(queryForCbo, getQB(), cboCtx); + if (queryProperties.hasMultiDestQuery()) { + handleMultiDestQuery(ast, cboCtx); + } + + if (runCBO) { + profilesCBO = obtainCBOProfiles(queryProperties); + + disableJoinMerge = true; + boolean reAnalyzeAST = false; + final boolean materializedView = getQB().isMaterializedView(); + + try { + if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + if (cboCtx.type == PreCboCtx.Type.VIEW && !materializedView) { + throw new SemanticException("Create view is not supported in cbo return path."); + } + sinkOp = getOptimizedHiveOPDag(); + LOG.info("CBO Succeeded; optimized logical plan."); + this.ctx.setCboInfo("Plan optimized by CBO."); + this.ctx.setCboSucceeded(true); + } else { + // 1. Gen Optimized AST + ASTNode newAST = getOptimizedAST(); + + // 1.1. Fix up the query for insert/ctas/materialized views + newAST = fixUpAfterCbo(ast, newAST, cboCtx); + + // 2. Regen OP plan from optimized AST + if (cboCtx.type == PreCboCtx.Type.VIEW && !materializedView) { + try { + handleCreateViewDDL(newAST); + } catch (SemanticException e) { + throw new CalciteViewSemanticException(e.getMessage()); + } + } else { + init(false); + if (cboCtx.type == PreCboCtx.Type.VIEW && materializedView) { + // Redo create-table/view analysis, because it's not part of + // doPhase1. + // Use the REWRITTEN AST + setAST(newAST); + newAST = reAnalyzeViewAfterCbo(newAST); + // Store text of the ORIGINAL QUERY + String originalText = ctx.getTokenRewriteStream().toString( + cboCtx.nodeOfInterest.getTokenStartIndex(), + cboCtx.nodeOfInterest.getTokenStopIndex()); + createVwDesc.setViewOriginalText(originalText); + viewSelect = newAST; + viewsExpanded = new ArrayList<>(); + viewsExpanded.add(createVwDesc.getViewName()); + } else if (cboCtx.type == PreCboCtx.Type.CTAS) { + // CTAS + setAST(newAST); + newAST = reAnalyzeCTASAfterCbo(newAST); + } + } + Phase1Ctx ctx_1 = initPhase1Ctx(); + if (!doPhase1(newAST, getQB(), ctx_1, null)) { + throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan"); + } + // unfortunately making prunedPartitions immutable is not possible + // here with SemiJoins not all tables are costed in CBO, so their + // PartitionList is not evaluated until the run phase. + getMetaData(getQB()); + + disableJoinMerge = defaultJoinMerge; + sinkOp = genPlan(getQB()); + LOG.info("CBO Succeeded; optimized logical plan."); + this.ctx.setCboInfo("Plan optimized by CBO."); + this.ctx.setCboSucceeded(true); + if (LOG.isTraceEnabled()) { + LOG.trace(newAST.dump()); + } + } + } catch (Exception e) { + boolean isMissingStats = noColsMissingStats.get() > 0; + if (isMissingStats) { + LOG.error("CBO failed due to missing column stats (see previous errors), skipping CBO"); + this.ctx + .setCboInfo("Plan not optimized by CBO due to missing statistics. Please check log for more details."); + } else { + LOG.error("CBO failed, skipping CBO. ", e); + if (e instanceof CalciteSemanticException) { + CalciteSemanticException calciteSemanticException = (CalciteSemanticException) e; + UnsupportedFeature unsupportedFeature = calciteSemanticException + .getUnsupportedFeature(); + if (unsupportedFeature != null) { + this.ctx.setCboInfo("Plan not optimized by CBO due to missing feature [" + + unsupportedFeature + "]."); + } else { + this.ctx.setCboInfo("Plan not optimized by CBO."); + } + } else { + this.ctx.setCboInfo("Plan not optimized by CBO."); + } + } + if( e instanceof CalciteSubquerySemanticException) { + // non-cbo path retries to execute subqueries and throws completely different exception/error + // to eclipse the original error message + // so avoid executing subqueries on non-cbo + throw new SemanticException(e); + } + else if( e instanceof CalciteViewSemanticException) { + // non-cbo path retries to execute create view and + // we believe it will throw the same error message + throw new SemanticException(e); + } + else if (!conf.getBoolVar(ConfVars.HIVE_IN_TEST) || isMissingStats + || e instanceof CalciteSemanticException ) { + reAnalyzeAST = true; + } else if (e instanceof SemanticException) { + // although, its likely to be a valid exception, we will retry + // with cbo off anyway. + reAnalyzeAST = true; + } else if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new SemanticException(e); + } + } finally { + runCBO = false; + disableJoinMerge = defaultJoinMerge; + disableSemJoinReordering = false; + if (reAnalyzeAST) { + init(true); + prunedPartitions.clear(); + // Assumption: At this point Parse Tree gen & resolution will always + // be true (since we started out that way). + super.genResolvedParseTree(ast, new PlannerContext()); + skipCalcitePlan = true; + } + } + } else { + this.ctx.setCboInfo("Plan not optimized by CBO."); + skipCalcitePlan = true; + } + } + + if (skipCalcitePlan) { + sinkOp = super.genOPTree(ast, plannerCtx); + } + + return sinkOp; + } + + private void handleCreateViewDDL(ASTNode newAST) throws SemanticException { + saveViewDefinition(); + String originalText = createVwDesc.getViewOriginalText(); + String expandedText = createVwDesc.getViewExpandedText(); + List<FieldSchema> schema = createVwDesc.getSchema(); + List<FieldSchema> partitionColumns = createVwDesc.getPartCols(); + init(false); + setAST(newAST); + newAST = reAnalyzeViewAfterCbo(newAST); + createVwDesc.setViewOriginalText(originalText); + createVwDesc.setViewExpandedText(expandedText); + createVwDesc.setSchema(schema); + createVwDesc.setPartCols(partitionColumns); + } + + /* + * Tries to optimize FROM clause of multi-insert. No attempt to optimize insert clauses of the query. + * Returns true if rewriting is successful, false otherwise. + */ + private void handleMultiDestQuery(ASTNode ast, PreCboCtx cboCtx) throws SemanticException { + // Not supported by CBO + if (!runCBO) { + return; + } + // Currently, we only optimized the query the content of the FROM clause + // for multi-insert queries. Thus, nodeOfInterest is the FROM clause + if (isJoinToken(cboCtx.nodeOfInterest)) { + // Join clause: rewriting is needed + ASTNode subq = rewriteASTForMultiInsert(ast, cboCtx.nodeOfInterest); + if (subq != null) { + // We could rewrite into a subquery + cboCtx.nodeOfInterest = (ASTNode) subq.getChild(0); + QB newQB = new QB(null, "", false); + Phase1Ctx ctx_1 = initPhase1Ctx(); + doPhase1(cboCtx.nodeOfInterest, newQB, ctx_1, null); + setQB(newQB); + getMetaData(getQB()); + } else { + runCBO = false; + } + } else if (cboCtx.nodeOfInterest.getToken().getType() == HiveParser.TOK_SUBQUERY) { + // Subquery: no rewriting needed + ASTNode subq = cboCtx.nodeOfInterest; + // First child is subquery, second child is alias + // We set the node of interest and QB to the subquery + // We do not need to generate the QB again, but rather we use it directly + cboCtx.nodeOfInterest = (ASTNode) subq.getChild(0); + String subQAlias = unescapeIdentifier(subq.getChild(1).getText()); + final QB newQB = getQB().getSubqForAlias(subQAlias).getQB(); + newQB.getParseInfo().setAlias(""); + newQB.getParseInfo().setIsSubQ(false); + setQB(newQB); + } else { + // No need to run CBO (table ref or virtual table) or not supported + runCBO = false; + } + } + + private ASTNode rewriteASTForMultiInsert(ASTNode query, ASTNode nodeOfInterest) { + // 1. gather references from original query + // This is a map from aliases to references. + // We keep all references as we will need to modify them after creating + // the subquery + final Multimap<String, Object> aliasNodes = ArrayListMultimap.create(); + // To know if we need to bail out + final AtomicBoolean notSupported = new AtomicBoolean(false); + TreeVisitorAction action = new TreeVisitorAction() { + @Override + public Object pre(Object t) { + if (!notSupported.get()) { + if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_ALLCOLREF) { + // TODO: this is a limitation of the AST rewriting approach that we will + // not be able to overcome till proper integration of full multi-insert + // queries with Calcite is implemented. + // The current rewriting gather references from insert clauses and then + // updates them with the new subquery references. However, if insert + // clauses use * or tab.*, we cannot resolve the columns that we are + // referring to. Thus, we just bail out and those queries will not be + // currently optimized by Calcite. + // An example of such query is: + // FROM T_A a LEFT JOIN T_B b ON a.id = b.id + // INSERT OVERWRITE TABLE join_result_1 + // SELECT a.*, b.* + // INSERT OVERWRITE TABLE join_result_3 + // SELECT a.*, b.*; + notSupported.set(true); + } else if (ParseDriver.adaptor.getType(t) == HiveParser.DOT) { + Object c = ParseDriver.adaptor.getChild(t, 0); + if (c != null && ParseDriver.adaptor.getType(c) == HiveParser.TOK_TABLE_OR_COL) { + aliasNodes.put(((ASTNode) t).toStringTree(), t); + } + } else if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_TABLE_OR_COL) { + Object p = ParseDriver.adaptor.getParent(t); + if (p == null || ParseDriver.adaptor.getType(p) != HiveParser.DOT) { + aliasNodes.put(((ASTNode) t).toStringTree(), t); + } + } + } + return t; + } + @Override + public Object post(Object t) { + return t; + } + }; + TreeVisitor tv = new TreeVisitor(ParseDriver.adaptor); + // We will iterate through the children: if it is an INSERT, we will traverse + // the subtree to gather the references + for (int i = 0; i < query.getChildCount(); i++) { + ASTNode child = (ASTNode) query.getChild(i); + if (ParseDriver.adaptor.getType(child) != HiveParser.TOK_INSERT) { + // If it is not an INSERT, we do not need to anything + continue; + } + tv.visit(child, action); + } + if (notSupported.get()) { + // Bail out + return null; + } + // 2. rewrite into query + // TOK_QUERY + // TOK_FROM + // join + // TOK_INSERT + // TOK_DESTINATION + // TOK_DIR + // TOK_TMP_FILE + // TOK_SELECT + // refs + ASTNode from = new ASTNode(new CommonToken(HiveParser.TOK_FROM, "TOK_FROM")); + from.addChild((ASTNode) ParseDriver.adaptor.dupTree(nodeOfInterest)); + ASTNode destination = new ASTNode(new CommonToken(HiveParser.TOK_DESTINATION, "TOK_DESTINATION")); + ASTNode dir = new ASTNode(new CommonToken(HiveParser.TOK_DIR, "TOK_DIR")); + ASTNode tmpFile = new ASTNode(new CommonToken(HiveParser.TOK_TMP_FILE, "TOK_TMP_FILE")); + dir.addChild(tmpFile); + destination.addChild(dir); + ASTNode select = new ASTNode(new CommonToken(HiveParser.TOK_SELECT, "TOK_SELECT")); + int num = 0; + for (Collection<Object> selectIdentifier : aliasNodes.asMap().values()) { + Iterator<Object> it = selectIdentifier.iterator(); + ASTNode node = (ASTNode) it.next(); + // Add select expression + ASTNode selectExpr = new ASTNode(new CommonToken(HiveParser.TOK_SELEXPR, "TOK_SELEXPR")); + selectExpr.addChild((ASTNode) ParseDriver.adaptor.dupTree(node)); // Identifier + String colAlias = "col" + num; + selectExpr.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, colAlias))); // Alias + select.addChild(selectExpr); + // Rewrite all INSERT references (all the node values for this key) + ASTNode colExpr = new ASTNode(new CommonToken(HiveParser.TOK_TABLE_OR_COL, "TOK_TABLE_OR_COL")); + colExpr.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, colAlias))); + replaceASTChild(node, colExpr); + while (it.hasNext()) { + // Loop to rewrite rest of INSERT references + node = (ASTNode) it.next(); + colExpr = new ASTNode(new CommonToken(HiveParser.TOK_TABLE_OR_COL, "TOK_TABLE_OR_COL")); + colExpr.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, colAlias))); + replaceASTChild(node, colExpr); + } + num++; + } + ASTNode insert = new ASTNode(new CommonToken(HiveParser.TOK_INSERT, "TOK_INSERT")); + insert.addChild(destination); + insert.addChild(select); + ASTNode newQuery = new ASTNode(new CommonToken(HiveParser.TOK_QUERY, "TOK_QUERY")); + newQuery.addChild(from); + newQuery.addChild(insert); + // 3. create subquery + ASTNode subq = new ASTNode(new CommonToken(HiveParser.TOK_SUBQUERY, "TOK_SUBQUERY")); + subq.addChild(newQuery); + subq.addChild(new ASTNode(new CommonToken(HiveParser.Identifier, "subq"))); + replaceASTChild(nodeOfInterest, subq); + // 4. return subquery + return subq; + } + + /** + * Can CBO handle the given AST? + * + * @param ast + * Top level AST + * @param qb + * top level QB corresponding to the AST + * @param cboCtx + * @param semAnalyzer + * @return boolean + * + * Assumption:<br> + * If top level QB is query then everything below it must also be + * Query. + */ + boolean canCBOHandleAst(ASTNode ast, QB qb, PreCboCtx cboCtx) { + int root = ast.getToken().getType(); + boolean needToLogMessage = STATIC_LOG.isInfoEnabled(); + boolean isSupportedRoot = root == HiveParser.TOK_QUERY || root == HiveParser.TOK_EXPLAIN + || qb.isCTAS() || qb.isMaterializedView(); + // Queries without a source table currently are not supported by CBO + boolean isSupportedType = (qb.getIsQuery() && !qb.containsQueryWithoutSourceTable()) + || qb.isCTAS() || qb.isMaterializedView() || cboCtx.type == PreCboCtx.Type.INSERT + || cboCtx.type == PreCboCtx.Type.MULTI_INSERT; + boolean noBadTokens = HiveCalciteUtil.validateASTForUnsupportedTokens(ast); + boolean result = isSupportedRoot && isSupportedType && noBadTokens; + + if (!result) { + if (needToLogMessage) { + String msg = ""; + if (!isSupportedRoot) { + msg += "doesn't have QUERY or EXPLAIN as root and not a CTAS; "; + } + if (!isSupportedType) { + msg += "is not a query with at least one source table " + + " or there is a subquery without a source table, or CTAS, or insert; "; + } + if (!noBadTokens) { + msg += "has unsupported tokens; "; + } + + if (msg.isEmpty()) { + msg += "has some unspecified limitations; "; + } + STATIC_LOG.info("Not invoking CBO because the statement " + + msg.substring(0, msg.length() - 2)); + } + return false; + } + // Now check QB in more detail. canHandleQbForCbo returns null if query can + // be handled. + String msg = CalcitePlanner.canHandleQbForCbo(queryProperties, conf, true, needToLogMessage, qb); + if (msg == null) { + return true; + } + if (needToLogMessage) { + STATIC_LOG.info("Not invoking CBO because the statement " + + msg.substring(0, msg.length() - 2)); + } + return false; + } + + /** + * Checks whether Calcite can handle the query. + * + * @param queryProperties + * @param conf + * @param topLevelQB + * Does QB corresponds to top most query block? + * @param verbose + * Whether return value should be verbose in case of failure. + * @return null if the query can be handled; non-null reason string if it + * cannot be. + * + * Assumption:<br> + * 1. If top level QB is query then everything below it must also be + * Query<br> + * 2. Nested Subquery will return false for qbToChk.getIsQuery() + */ + static String canHandleQbForCbo(QueryProperties queryProperties, HiveConf conf, + boolean topLevelQB, boolean verbose, QB qb) { + + if (!queryProperties.hasClusterBy() && !queryProperties.hasDistributeBy() + && !queryProperties.hasSortBy() && !queryProperties.hasPTF() && !queryProperties.usesScript() + && !queryProperties.hasLateralViews()) { + // Ok to run CBO. + return null; + } + + // Not ok to run CBO, build error message. + String msg = ""; + if (verbose) { + if (queryProperties.hasClusterBy()) + msg += "has cluster by; "; + if (queryProperties.hasDistributeBy()) + msg += "has distribute by; "; + if (queryProperties.hasSortBy()) + msg += "has sort by; "; + if (queryProperties.hasPTF()) + msg += "has PTF; "; + if (queryProperties.usesScript()) + msg += "uses scripts; "; + if (queryProperties.hasLateralViews()) + msg += "has lateral views; "; + + if (msg.isEmpty()) + msg += "has some unspecified limitations; "; + } + return msg; + } + + /* This method inserts the right profiles into profiles CBO depending + * on the query characteristics. */ + private static EnumSet<ExtendedCBOProfile> obtainCBOProfiles(QueryProperties queryProperties) { + EnumSet<ExtendedCBOProfile> profilesCBO = EnumSet.noneOf(ExtendedCBOProfile.class); + // If the query contains more than one join + if (queryProperties.getJoinCount() > 1) { + profilesCBO.add(ExtendedCBOProfile.JOIN_REORDERING); + } + // If the query contains windowing processing + if (queryProperties.hasWindowing()) { + profilesCBO.add(ExtendedCBOProfile.WINDOWING_POSTPROCESSING); + } + return profilesCBO; + } + + @Override + boolean isCBOExecuted() { + return runCBO; + } + + @Override + boolean continueJoinMerge() { + return !(runCBO && disableSemJoinReordering); + } + + @Override + Table materializeCTE(String cteName, CTEClause cte) throws HiveException { + + ASTNode createTable = new ASTNode(new ClassicToken(HiveParser.TOK_CREATETABLE)); + + ASTNode tableName = new ASTNode(new ClassicToken(HiveParser.TOK_TABNAME)); + tableName.addChild(new ASTNode(new ClassicToken(HiveParser.Identifier, cteName))); + + ASTNode temporary = new ASTNode(new ClassicToken(HiveParser.KW_TEMPORARY, MATERIALIZATION_MARKER)); + + createTable.addChild(tableName); + createTable.addChild(temporary); + createTable.addChild(cte.cteNode); + + CalcitePlanner analyzer = new CalcitePlanner(queryState); + analyzer.initCtx(ctx); + analyzer.init(false); + + // should share cte contexts + analyzer.aliasToCTEs.putAll(aliasToCTEs); + + HiveOperation operation = queryState.getHiveOperation(); + try { + analyzer.analyzeInternal(createTable); + } finally { + queryState.setCommandType(operation); + } + + Table table = analyzer.tableDesc.toTable(conf); + Path location = table.getDataLocation(); + try { + location.getFileSystem(conf).mkdirs(location); + } catch (IOException e) { + throw new HiveException(e); + } + table.setMaterializedTable(true); + + LOG.info(cteName + " will be materialized into " + location); + cte.table = table; + cte.source = analyzer; + + ctx.addMaterializedTable(cteName, table); + // For CalcitePlanner, store qualified name too + ctx.addMaterializedTable(table.getDbName() + "." + table.getTableName(), table); + + return table; + } + + @Override + String fixCtasColumnName(String colName) { + if (runCBO) { + int lastDot = colName.lastIndexOf('.'); + if (lastDot < 0) + return colName; // alias is not fully qualified + String nqColumnName = colName.substring(lastDot + 1); + STATIC_LOG.debug("Replacing " + colName + " (produced by CBO) by " + nqColumnName); + return nqColumnName; + } + + return super.fixCtasColumnName(colName); + } + + /** + * The context that doPhase1 uses to populate information pertaining to CBO + * (currently, this is used for CTAS and insert-as-select). + */ + static class PreCboCtx extends PlannerContext { + enum Type { + NONE, INSERT, MULTI_INSERT, CTAS, VIEW, UNEXPECTED + } + + private ASTNode nodeOfInterest; + private Type type = Type.NONE; + + private void set(Type type, ASTNode ast) { + if (this.type != Type.NONE) { + STATIC_LOG.warn("Setting " + type + " when already " + this.type + "; node " + ast.dump() + + " vs old node " + nodeOfInterest.dump()); + this.type = Type.UNEXPECTED; + return; + } + this.type = type; + this.nodeOfInterest = ast; + } + + @Override + void setCTASToken(ASTNode child) { + set(PreCboCtx.Type.CTAS, child); + } + + @Override + void setViewToken(ASTNode child) { + set(PreCboCtx.Type.VIEW, child); + } + + @Override + void setInsertToken(ASTNode ast, boolean isTmpFileDest) { + if (!isTmpFileDest) { + set(PreCboCtx.Type.INSERT, ast); + } + } + + @Override + void setMultiInsertToken(ASTNode child) { + set(PreCboCtx.Type.MULTI_INSERT, child); + } + + @Override + void resetToken() { + this.type = Type.NONE; + this.nodeOfInterest = null; + } + } + + ASTNode fixUpAfterCbo(ASTNode originalAst, ASTNode newAst, PreCboCtx cboCtx) + throws SemanticException { + switch (cboCtx.type) { + + case NONE: + // nothing to do + return newAst; + + case CTAS: + case VIEW: { + // Patch the optimized query back into original CTAS AST, replacing the + // original query. + replaceASTChild(cboCtx.nodeOfInterest, newAst); + return originalAst; + } + + case INSERT: { + // We need to patch the dest back to original into new query. + // This makes assumptions about the structure of the AST. + ASTNode newDest = new ASTSearcher().simpleBreadthFirstSearch(newAst, HiveParser.TOK_QUERY, + HiveParser.TOK_INSERT, HiveParser.TOK_DESTINATION); + if (newDest == null) { + LOG.error("Cannot find destination after CBO; new ast is " + newAst.dump()); + throw new SemanticException("Cannot find destination after CBO"); + } + replaceASTChild(newDest, cboCtx.nodeOfInterest); + return newAst; + } + + case MULTI_INSERT: { + // Patch the optimized query back into original FROM clause. + replaceASTChild(cboCtx.nodeOfInterest, newAst); + return originalAst; + } + + default: + throw new AssertionError("Unexpected type " + cboCtx.type); + } + } + + ASTNode reAnalyzeCTASAfterCbo(ASTNode newAst) throws SemanticException { + // analyzeCreateTable uses this.ast, but doPhase1 doesn't, so only reset it + // here. + newAst = analyzeCreateTable(newAst, getQB(), null); + if (newAst == null) { + LOG.error("analyzeCreateTable failed to initialize CTAS after CBO;" + " new ast is " + + getAST().dump()); + throw new SemanticException("analyzeCreateTable failed to initialize CTAS after CBO"); + } + return newAst; + } + + ASTNode reAnalyzeViewAfterCbo(ASTNode newAst) throws SemanticException { + // analyzeCreateView uses this.ast, but doPhase1 doesn't, so only reset it + // here. + newAst = analyzeCreateView(newAst, getQB(), null); + if (newAst == null) { + LOG.error("analyzeCreateTable failed to initialize materialized view after CBO;" + " new ast is " + + getAST().dump()); + throw new SemanticException("analyzeCreateTable failed to initialize materialized view after CBO"); + } + return newAst; + } + + + public static class ASTSearcher { + private final LinkedList<ASTNode> searchQueue = new LinkedList<ASTNode>(); + + /** + * Performs breadth-first search of the AST for a nested set of tokens. Tokens + * don't have to be each others' direct children, they can be separated by + * layers of other tokens. For each token in the list, the first one found is + * matched and there's no backtracking; thus, if AST has multiple instances of + * some token, of which only one matches, it is not guaranteed to be found. We + * use this for simple things. Not thread-safe - reuses searchQueue. + */ + public ASTNode simpleBreadthFirstSearch(ASTNode ast, int... tokens) { + searchQueue.clear(); + searchQueue.add(ast); + for (int i = 0; i < tokens.length; ++i) { + boolean found = false; + int token = tokens[i]; + while (!searchQueue.isEmpty() && !found) { + ASTNode next = searchQueue.poll(); + found = next.getType() == token; + if (found) { + if (i == tokens.length - 1) + return next; + searchQueue.clear(); + } + for (int j = 0; j < next.getChildCount(); ++j) { + searchQueue.add((ASTNode) next.getChild(j)); + } + } + if (!found) + return null; + } + return null; + } + + public ASTNode depthFirstSearch(ASTNode ast, int token) { + searchQueue.clear(); + searchQueue.add(ast); + while (!searchQueue.isEmpty()) { + ASTNode next = searchQueue.poll(); + if (next.getType() == token) return next; + for (int j = 0; j < next.getChildCount(); ++j) { + searchQueue.add((ASTNode) next.getChild(j)); + } + } + return null; + } + + public ASTNode simpleBreadthFirstSearchAny(ASTNode ast, int... tokens) { + searchQueue.clear(); + searchQueue.add(ast); + while (!searchQueue.isEmpty()) { + ASTNode next = searchQueue.poll(); + for (int i = 0; i < tokens.length; ++i) { + if (next.getType() == tokens[i]) return next; + } + for (int i = 0; i < next.getChildCount(); ++i) { + searchQueue.add((ASTNode) next.getChild(i)); + } + } + return null; + } + + public void reset() { + searchQueue.clear(); + } + } + + private static void replaceASTChild(ASTNode child, ASTNode newChild) { + ASTNode parent = (ASTNode) child.parent; + int childIndex = child.childIndex; + parent.deleteChild(childIndex); + parent.insertChild(childIndex, newChild); + } + + /** + * Get optimized logical plan for the given QB tree in the semAnalyzer. + * + * @return + * @throws SemanticException + */ + RelNode logicalPlan() throws SemanticException { + RelNode optimizedOptiqPlan = null; + + CalcitePlannerAction calcitePlannerAction = null; + if (this.columnAccessInfo == null) { + this.columnAccessInfo = new ColumnAccessInfo(); + } + calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + + try { + optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks + .newConfigBuilder().typeSystem(new HiveTypeSystemImpl()).build()); + } catch (Exception e) { + rethrowCalciteException(e); + throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage()); + } + return optimizedOptiqPlan; + } + + /** + * Get Optimized AST for the given QB tree in the semAnalyzer. + * + * @return Optimized operator tree translated in to Hive AST + * @throws SemanticException + */ + ASTNode getOptimizedAST() throws SemanticException { + RelNode optimizedOptiqPlan = logicalPlan(); + ASTNode optiqOptimizedAST = ASTConverter.convert(optimizedOptiqPlan, resultSchema, + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COLUMN_ALIGNMENT)); + return optiqOptimizedAST; + } + + /** + * Get Optimized Hive Operator DAG for the given QB tree in the semAnalyzer. + * + * @return Optimized Hive operator tree + * @throws SemanticException + */ + Operator getOptimizedHiveOPDag() throws SemanticException { + RelNode optimizedOptiqPlan = null; + CalcitePlannerAction calcitePlannerAction = null; + if (this.columnAccessInfo == null) { + this.columnAccessInfo = new ColumnAccessInfo(); + } + calcitePlannerAction = new CalcitePlannerAction(prunedPartitions, this.columnAccessInfo); + + try { + optimizedOptiqPlan = Frameworks.withPlanner(calcitePlannerAction, Frameworks + .newConfigBuilder().typeSystem(new HiveTypeSystemImpl()).build()); + } catch (Exception e) { + rethrowCalciteException(e); + throw new AssertionError("rethrowCalciteException didn't throw for " + e.getMessage()); + } + + RelNode modifiedOptimizedOptiqPlan = PlanModifierForReturnPath.convertOpTree( + optimizedOptiqPlan, resultSchema, this.getQB().getTableDesc() != null); + + LOG.debug("Translating the following plan:\n" + RelOptUtil.toString(modifiedOptimizedOptiqPlan)); + Operator<?> hiveRoot = new HiveOpConverter(this, conf, unparseTranslator, topOps) + .convert(modifiedOptimizedOptiqPlan); + RowResolver hiveRootRR = genRowResolver(hiveRoot, getQB()); + opParseCtx.put(hiveRoot, new OpParseContext(hiveRootRR)); + String dest = getQB().getParseInfo().getClauseNames().iterator().next(); + if (getQB().getParseInfo().getDestSchemaForClause(dest) != null + && this.getQB().getTableDesc() == null) { + Operator<?> selOp = handleInsertStatement(dest, hiveRoot, hiveRootRR, getQB()); + return genFileSinkPlan(dest, getQB(), selOp); + } else { + return genFileSinkPlan(dest, getQB(), hiveRoot); + } + } + + // This function serves as the wrapper of handleInsertStatementSpec in + // SemanticAnalyzer + Operator<?> handleInsertStatement(String dest, Operator<?> input, RowResolver inputRR, QB qb) + throws SemanticException { + ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>(); + ArrayList<ColumnInfo> columns = inputRR.getColumnInfos(); + for (int i = 0; i < columns.size(); i++) { + ColumnInfo col = columns.get(i); + colList.add(new ExprNodeColumnDesc(col)); + } + ASTNode selExprList = qb.getParseInfo().getSelForClause(dest); + + RowResolver out_rwsch = handleInsertStatementSpec(colList, dest, inputRR, inputRR, qb, + selExprList); + + ArrayList<String> columnNames = new ArrayList<String>(); + Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>(); + for (int i = 0; i < colList.size(); i++) { + String outputCol = getColumnInternalName(i); + colExprMap.put(outputCol, colList.get(i)); + columnNames.add(outputCol); + } + Operator<?> output = putOpInsertMap(OperatorFactory.getAndMakeChild(new SelectDesc(colList, + columnNames), new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch); + output.setColumnExprMap(colExprMap); + return output; + } + + /*** + * Unwraps Calcite Invocation exceptions coming meta data provider chain and + * obtains the real cause. + * + * @param Exception + */ + private void rethrowCalciteException(Exception e) throws SemanticException { + Throwable first = (semanticException != null) ? semanticException : e, current = first, cause = current + .getCause(); + while (cause != null) { + Throwable causeOfCause = cause.getCause(); + if (current == first && causeOfCause == null && isUselessCause(first)) { + // "cause" is a root cause, and "e"/"first" is a useless + // exception it's wrapped in. + first = cause; + break; + } else if (causeOfCause != null && isUselessCause(cause) + && ExceptionHelper.resetCause(current, causeOfCause)) { + // "cause" was a useless intermediate cause and was replace it + // with its own cause. + cause = causeOfCause; + continue; // do loop once again with the new cause of "current" + } + current = cause; + cause = current.getCause(); + } + + if (first instanceof RuntimeException) { + throw (RuntimeException) first; + } else if (first instanceof SemanticException) { + throw (SemanticException) first; + } + throw new RuntimeException(first); + } + + private static class ExceptionHelper { + private static final Field CAUSE_FIELD = getField(Throwable.class, "cause"), + TARGET_FIELD = getField(InvocationTargetException.class, "target"), + MESSAGE_FIELD = getField(Throwable.class, "detailMessage"); + + private static Field getField(Class<?> clazz, String name) { + try { + Field f = clazz.getDeclaredField(name); + f.setAccessible(true); + return f; + } catch (Throwable t) { + return null; + } + } + + public static boolean resetCause(Throwable target, Throwable newCause) { + try { + if (MESSAGE_FIELD == null) + return false; + Field field = (target instanceof InvocationTargetException) ? TARGET_FIELD : CAUSE_FIELD; + if (field == null) + return false; + + Throwable oldCause = target.getCause(); + String oldMsg = target.getMessage(); + field.set(target, newCause); + if (oldMsg != null && oldMsg.equals(oldCause.toString())) { + MESSAGE_FIELD.set(target, newCause == null ? null : newCause.toString()); + } + } catch (Throwable se) { + return false; + } + return true; + } + } + + private boolean isUselessCause(Throwable t) { + return t instanceof RuntimeException || t instanceof InvocationTargetException + || t instanceof UndeclaredThrowableException; + } + + private RowResolver genRowResolver(Operator op, QB qb) { + RowResolver rr = new RowResolver(); + String subqAlias = (qb.getAliases().size() == 1 && qb.getSubqAliases().size() == 1) ? qb + .getAliases().get(0) : null; + + for (ColumnInfo ci : op.getSchema().getSignature()) { + try { + rr.putWithCheck((subqAlias != null) ? subqAlias : ci.getTabAlias(), + ci.getAlias() != null ? ci.getAlias() : ci.getInternalName(), ci.getInternalName(), + new ColumnInfo(ci)); + } catch (SemanticException e) { + throw new RuntimeException(e); + } + } + + return rr; + } + + private enum ExtendedCBOProfile { + JOIN_REORDERING, + WINDOWING_POSTPROCESSING; + } + + /** + * Code responsible for Calcite plan generation and optimization. + */ + private class CalcitePlannerAction implements Frameworks.PlannerAction<RelNode> { + private RelOptCluster cluster; + private RelOptSchema relOptSchema; + private final Map<String, PrunedPartitionList> partitionCache; + private final ColumnAccessInfo columnAccessInfo; + private Map<HiveProject, Table> viewProjectToTableSchema; + + //correlated vars across subqueries within same query needs to have different ID + // this will be used in RexNodeConverter to create cor var + private int subqueryId; + + // this is to keep track if a subquery is correlated and contains aggregate + // since this is special cased when it is rewritten in SubqueryRemoveRule + Set<RelNode> corrScalarRexSQWithAgg = new HashSet<RelNode>(); + + // TODO: Do we need to keep track of RR, ColNameToPosMap for every op or + // just last one. + LinkedHashMap<RelNode, RowResolver> relToHiveRR = new LinkedHashMap<RelNode, RowResolver>(); + LinkedHashMap<RelNode, ImmutableMap<String, Integer>> relToHiveColNameCalcitePosMap = new LinkedHashMap<RelNode, ImmutableMap<String, Integer>>(); + + CalcitePlannerAction(Map<String, PrunedPartitionList> partitionCache, ColumnAccessInfo columnAccessInfo) { + this.partitionCache = partitionCache; + this.columnAccessInfo = columnAccessInfo; + } + + @Override + public RelNode apply(RelOptCluster cluster, RelOptSchema relOptSchema, SchemaPlus rootSchema) { + RelNode calciteGenPlan = null; + RelNode calcitePreCboPlan = null; + RelNode calciteOptimizedPlan = null; + subqueryId = 0; + + /* + * recreate cluster, so that it picks up the additional traitDef + */ + final Double maxSplitSize = (double) HiveConf.getLongVar( + conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE); + final Double maxMemory = (double) HiveConf.getLongVar( + conf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); + HiveAlgorithmsConf algorithmsConf = new HiveAlgorithmsConf(maxSplitSize, maxMemory); + HiveRulesRegistry registry = new HiveRulesRegistry(); + Properties calciteConfigProperties = new Properties(); + calciteConfigProperties.setProperty( + CalciteConnectionProperty.MATERIALIZATIONS_ENABLED.camelName(), + Boolean.FALSE.toString()); + CalciteConnectionConfig calciteConfig = new CalciteConnectionConfigImpl(calciteConfigProperties); + HivePlannerContext confContext = new HivePlannerContext(algorithmsConf, registry, calciteConfig, + corrScalarRexSQWithAgg); + RelOptPlanner planner = HiveVolcanoPlanner.createPlanner(confContext); + final RexBuilder rexBuilder = cluster.getRexBuilder(); + final RelOptCluster optCluster = RelOptCluster.create(planner, rexBuilder); + + this.cluster = optCluster; + this.relOptSchema = relOptSchema; + + PerfLogger perfLogger = SessionState.getPerfLogger(); + + // 1. Gen Calcite Plan + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + try { + calciteGenPlan = genLogicalPlan(getQB(), true, null, null); + // if it is to create view, we do not use table alias + resultSchema = SemanticAnalyzer.convertRowSchemaToResultSetSchema( + relToHiveRR.get(calciteGenPlan), + getQB().isView() ? false : HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES)); + } catch (SemanticException e) { + semanticException = e; + throw new RuntimeException(e); + } + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Plan generation"); + + // Create executor + RexExecutor executorProvider = new HiveRexExecutorImpl(optCluster); + calciteGenPlan.getCluster().getPlanner().setExecutor(executorProvider); + + // We need to get the ColumnAccessInfo and viewToTableSchema for views. + HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, + HiveRelFactories.HIVE_BUILDER.create(optCluster, null), this.columnAccessInfo, + this.viewProjectToTableSchema); + + fieldTrimmer.trim(calciteGenPlan); + + // Create and set MD provider + HiveDefaultRelMetadataProvider mdProvider = new HiveDefaultRelMetadataProvider(conf); + RelMetadataQuery.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(mdProvider.getMetadataProvider())); + + //Remove subquery + LOG.debug("Plan before removing subquery:\n" + RelOptUtil.toString(calciteGenPlan)); + calciteGenPlan = hepPlan(calciteGenPlan, false, mdProvider.getMetadataProvider(), null, + HiveSubQueryRemoveRule.REL_NODE); + LOG.debug("Plan just after removing subquery:\n" + RelOptUtil.toString(calciteGenPlan)); + + calciteGenPlan = HiveRelDecorrelator.decorrelateQuery(calciteGenPlan); + LOG.debug("Plan after decorrelation:\n" + RelOptUtil.toString(calciteGenPlan)); + + // 2. Apply pre-join order optimizations + calcitePreCboPlan = applyPreJoinOrderingTransforms(calciteGenPlan, + mdProvider.getMetadataProvider(), executorProvider); + + // 3. Apply join order optimizations: reordering MST algorithm + // If join optimizations failed because of missing stats, we continue with + // the rest of optimizations + if (profilesCBO.contains(ExtendedCBOProfile.JOIN_REORDERING)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + try { + List<RelMetadataProvider> list = Lists.newArrayList(); + list.add(mdProvider.getMetadataProvider()); + RelTraitSet desiredTraits = optCluster + .traitSetOf(HiveRelNode.CONVENTION, RelCollations.EMPTY); + + HepProgramBuilder hepPgmBldr = new HepProgramBuilder().addMatchOrder(HepMatchOrder.BOTTOM_UP); + hepPgmBldr.addRuleInstance(new JoinToMultiJoinRule(HiveJoin.class)); + hepPgmBldr.addRuleInstance(new LoptOptimizeJoinRule(HiveRelFactories.HIVE_BUILDER)); + + HepProgram hepPgm = hepPgmBldr.build(); + HepPlanner hepPlanner = new HepPlanner(hepPgm); + + hepPlanner.registerMetadataProviders(list); + RelMetadataProvider chainedProvider = ChainedRelMetadataProvider.of(list); + optCluster.setMetadataProvider(new CachingRelMetadataProvider(chainedProvider, hepPlanner)); + + RelNode rootRel = calcitePreCboPlan; + hepPlanner.setRoot(rootRel); + if (!calcitePreCboPlan.getTraitSet().equals(desiredTraits)) { + rootRel = hepPlanner.changeTraits(calcitePreCboPlan, desiredTraits); + } + hepPlanner.setRoot(rootRel); + + calciteOptimizedPlan = hepPlanner.findBestExp(); + } catch (Exception e) { + boolean isMissingStats = noColsMissingStats.get() > 0; + if (isMissingStats) { + LOG.warn("Missing column stats (see previous messages), skipping join reordering in CBO"); + noColsMissingStats.set(0); + calciteOptimizedPlan = calcitePreCboPlan; + disableSemJoinReordering = false; + } else { + throw e; + } + } + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Join Reordering"); + } else { + calciteOptimizedPlan = calcitePreCboPlan; + disableSemJoinReordering = false; + } + + // 4. Run other optimizations that do not need stats + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, ProjectRemoveRule.INSTANCE, UnionMergeRule.INSTANCE, + HiveProjectMergeRule.INSTANCE_NO_FORCE, HiveAggregateProjectMergeRule.INSTANCE, + HiveJoinCommuteRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Optimizations without stats"); + + // 5. Materialized view based rewriting + // We disable it for CTAS and MV creation queries (trying to avoid any problem + // due to data freshness) + if (conf.getBoolVar(ConfVars.HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING) && + !getQB().isMaterializedView() && !getQB().isCTAS()) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + // Use Calcite cost model for view rewriting + RelMetadataProvider calciteMdProvider = DefaultRelMetadataProvider.INSTANCE; + RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(calciteMdProvider)); + planner.registerMetadataProviders(Lists.newArrayList(calciteMdProvider)); + // Add views to planner + List<RelOptMaterialization> materializations = new ArrayList<>(); + try { + materializations = Hive.get().getRewritingMaterializedViews(); + // We need to use the current cluster for the scan operator on views, + // otherwise the planner will throw an Exception (different planners) + materializations = Lists.transform(materializations, + new Function<RelOptMaterialization, RelOptMaterialization>() { + @Override + public RelOptMaterialization apply(RelOptMaterialization materialization) { + final RelNode viewScan = materialization.tableRel; + final RelNode newViewScan; + if (viewScan instanceof DruidQuery) { + final DruidQuery dq = (DruidQuery) viewScan; + newViewScan = DruidQuery.create(optCluster, optCluster.traitSetOf(HiveRelNode.CONVENTION), + viewScan.getTable(), dq.getDruidTable(), + ImmutableList.<RelNode>of(dq.getTableScan())); + } else { + newViewScan = new HiveTableScan(optCluster, optCluster.traitSetOf(HiveRelNode.CONVENTION), + (RelOptHiveTable) viewScan.getTable(), viewScan.getTable().getQualifiedName().get(0), + null, false, false); + } + return new RelOptMaterialization(newViewScan, materialization.queryRel, null); + } + } + ); + } catch (HiveException e) { + LOG.warn("Exception loading materialized views", e); + } + if (!materializations.isEmpty()) { + for (RelOptMaterialization materialization : materializations) { + planner.addMaterialization(materialization); + } + // Add view-based rewriting rules to planner + planner.addRule(HiveMaterializedViewFilterScanRule.INSTANCE); + // Optimize plan + planner.setRoot(calciteOptimizedPlan); + calciteOptimizedPlan = planner.findBestExp(); + // Remove view-based rewriting rules from planner + planner.clear(); + } + // Restore default cost model + RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider.getMetadataProvider())); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting"); + } + + // 6. Run aggregate-join transpose (cost based) + // If it failed because of missing stats, we continue with + // the rest of optimizations + if (conf.getBoolVar(ConfVars.AGGR_JOIN_TRANSPOSE)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + try { + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, HiveAggregateJoinTransposeRule.INSTANCE); + } catch (Exception e) { + boolean isMissingStats = noColsMissingStats.get() > 0; + if (isMissingStats) { + LOG.warn("Missing column stats (see previous messages), skipping aggregate-join transpose in CBO"); + noColsMissingStats.set(0); + } else { + throw e; + } + } + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Aggregate join transpose"); + } + + // 7.convert Join + GBy to semijoin + // run this rule at later stages, since many calcite rules cant deal with semijoin + if (conf.getBoolVar(ConfVars.SEMIJOIN_CONVERSION)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, HiveSemiJoinRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Semijoin conversion"); + } + + + // 8. Run rule to fix windowing issue when it is done over + // aggregation columns (HIVE-10627) + if (profilesCBO.contains(ExtendedCBOProfile.WINDOWING_POSTPROCESSING)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, HiveWindowingFixRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Window fixing rule"); + } + + // 9. Apply Druid transformation rules + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, DruidRules.FILTER, DruidRules.AGGREGATE_PROJECT, + DruidRules.PROJECT, DruidRules.AGGREGATE, DruidRules.SORT_PROJECT_TRANSPOSE, + DruidRules.SORT, DruidRules.PROJECT_SORT_TRANSPOSE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Druid transformation rules"); + + // 10. Run rules to aid in translation from Calcite tree to Hive tree + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CBO_RETPATH_HIVEOP)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + // 10.1. Merge join into multijoin operators (if possible) + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, HiveJoinProjectTransposeRule.BOTH_PROJECT_INCLUDE_OUTER, + HiveJoinProjectTransposeRule.LEFT_PROJECT_INCLUDE_OUTER, + HiveJoinProjectTransposeRule.RIGHT_PROJECT_INCLUDE_OUTER, + HiveJoinToMultiJoinRule.INSTANCE, HiveProjectMergeRule.INSTANCE); + // The previous rules can pull up projections through join operators, + // thus we run the field trimmer again to push them back down + fieldTrimmer = new HiveRelFieldTrimmer(null, + HiveRelFactories.HIVE_BUILDER.create(optCluster, null)); + calciteOptimizedPlan = fieldTrimmer.trim(calciteOptimizedPlan); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, ProjectRemoveRule.INSTANCE, + new ProjectMergeRule(false, HiveRelFactories.HIVE_BUILDER)); + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, true, mdProvider.getMetadataProvider(), null, + HiveFilterProjectTSTransposeRule.INSTANCE, HiveFilterProjectTSTransposeRule.INSTANCE_DRUID, + HiveProjectFilterPullUpConstantsRule.INSTANCE); + + // 10.2. Introduce exchange operators below join/multijoin operators + calciteOptimizedPlan = hepPlan(calciteOptimizedPlan, false, mdProvider.getMetadataProvider(), null, + HepMatchOrder.BOTTOM_UP, HiveInsertExchange4JoinRule.EXCHANGE_BELOW_JOIN, + HiveInsertExchange4JoinRule.EXCHANGE_BELOW_MULTIJOIN); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: Translation from Calcite tree to Hive tree"); + } + + if (LOG.isDebugEnabled() && !conf.getBoolVar(ConfVars.HIVE_IN_TEST)) { + LOG.debug("CBO Planning details:\n"); + LOG.debug("Original Plan:\n" + RelOptUtil.toString(calciteGenPlan)); + LOG.debug("Plan After PPD, PartPruning, ColumnPruning:\n" + + RelOptUtil.toString(calcitePreCboPlan)); + LOG.debug("Plan After Join Reordering:\n" + + RelOptUtil.toString(calciteOptimizedPlan, SqlExplainLevel.ALL_ATTRIBUTES)); + } + + return calciteOptimizedPlan; + } + + /** + * Perform all optimizations before Join Ordering. + * + * @param basePlan + * original plan + * @param mdProvider + * meta data provider + * @param executorProvider + * executor + * @return + */ + private RelNode applyPreJoinOrderingTransforms(RelNode basePlan, RelMetadataProvider mdProvider, RexExecutor executorProvider) { + // TODO: Decorelation of subquery should be done before attempting + // Partition Pruning; otherwise Expression evaluation may try to execute + // corelated sub query. + + PerfLogger perfLogger = SessionState.getPerfLogger(); + + final int maxCNFNodeCount = conf.getIntVar(HiveConf.ConfVars.HIVE_CBO_CNF_NODES_LIMIT); + final int minNumORClauses = conf.getIntVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZERMIN); + + //0. SetOp rewrite + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, true, mdProvider, null, HepMatchOrder.BOTTOM_UP, + HiveProjectOverIntersectRemoveRule.INSTANCE, HiveIntersectMergeRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: HiveProjectOverIntersectRemoveRule and HiveIntersectMerge rules"); + + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, executorProvider, HepMatchOrder.BOTTOM_UP, + HiveIntersectRewriteRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: HiveIntersectRewrite rule"); + + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, executorProvider, HepMatchOrder.BOTTOM_UP, + HiveExceptRewriteRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: HiveExceptRewrite rule"); + + //1. Distinct aggregate rewrite + // Run this optimization early, since it is expanding the operator pipeline. + if (!conf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr") && + conf.getBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEDISTINCTREWRITE)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + // Its not clear, if this rewrite is always performant on MR, since extra map phase + // introduced for 2nd MR job may offset gains of this multi-stage aggregation. + // We need a cost model for MR to enable this on MR. + basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, HiveExpandDistinctAggregatesRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Distinct aggregate rewrite"); + } + + // 2. Try factoring out common filter elements & separating deterministic + // vs non-deterministic UDF. This needs to run before PPD so that PPD can + // add on-clauses for old style Join Syntax + // Ex: select * from R1 join R2 where ((R1.x=R2.x) and R1.y<10) or + // ((R1.x=R2.x) and R1.z=10)) and rand(1) < 0.1 + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, executorProvider, HepMatchOrder.ARBITRARY, + new HivePreFilteringRule(maxCNFNodeCount)); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, factor out common filter elements and separating deterministic vs non-deterministic UDF"); + + // 3. Run exhaustive PPD, add not null filters, transitive inference, + // constant propagation, constant folding + List<RelOptRule> rules = Lists.newArrayList(); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTPPD_WINDOWING)) { + rules.add(HiveFilterProjectTransposeRule.INSTANCE_DETERMINISTIC_WINDOWING); + } else { + rules.add(HiveFilterProjectTransposeRule.INSTANCE_DETERMINISTIC); + } + rules.add(HiveFilterSetOpTransposeRule.INSTANCE); + rules.add(HiveFilterSortTransposeRule.INSTANCE); + rules.add(HiveFilterJoinRule.JOIN); + rules.add(HiveFilterJoinRule.FILTER_ON_JOIN); + rules.add(new HiveFilterAggregateTransposeRule(Filter.class, HiveRelFactories.HIVE_FILTER_FACTORY, Aggregate.class)); + rules.add(new FilterMergeRule(HiveRelFactories.HIVE_BUILDER)); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_REDUCE_WITH_STATS)) { + rules.add(HiveReduceExpressionsWithStatsRule.INSTANCE); + } + rules.add(HiveProjectFilterPullUpConstantsRule.INSTANCE); + rules.add(HiveReduceExpressionsRule.PROJECT_INSTANCE); + rules.add(HiveReduceExpressionsRule.FILTER_INSTANCE); + rules.add(HiveReduceExpressionsRule.JOIN_INSTANCE); + if (conf.getBoolVar(HiveConf.ConfVars.HIVEPOINTLOOKUPOPTIMIZER)) { + rules.add(new HivePointLookupOptimizerRule.FilterCondition(minNumORClauses)); + rules.add(new HivePointLookupOptimizerRule.JoinCondition(minNumORClauses)); + } + rules.add(HiveJoinAddNotNullRule.INSTANCE_JOIN); + rules.add(HiveJoinAddNotNullRule.INSTANCE_SEMIJOIN); + rules.add(HiveJoinPushTransitivePredicatesRule.INSTANCE_JOIN); + rules.add(HiveJoinPushTransitivePredicatesRule.INSTANCE_SEMIJOIN); + rules.add(HiveSortMergeRule.INSTANCE); + rules.add(HiveSortLimitPullUpConstantsRule.INSTANCE); + rules.add(HiveUnionPullUpConstantsRule.INSTANCE); + rules.add(HiveAggregatePullUpConstantsRule.INSTANCE); + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, HepMatchOrder.BOTTOM_UP, + rules.toArray(new RelOptRule[rules.size()])); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, PPD, not null predicates, transitive inference, constant folding"); + + // 4. Push down limit through outer join + // NOTE: We run this after PPD to support old style join syntax. + // Ex: select * from R1 left outer join R2 where ((R1.x=R2.x) and R1.y<10) or + // ((R1.x=R2.x) and R1.z=10)) and rand(1) < 0.1 order by R1.x limit 10 + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_LIMIT_TRANSPOSE)) { + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + // This should be a cost based decision, but till we enable the extended cost + // model, we will use the given value for the variable + final float reductionProportion = HiveConf.getFloatVar(conf, + HiveConf.ConfVars.HIVE_OPTIMIZE_LIMIT_TRANSPOSE_REDUCTION_PERCENTAGE); + final long reductionTuples = HiveConf.getLongVar(conf, + HiveConf.ConfVars.HIVE_OPTIMIZE_LIMIT_TRANSPOSE_REDUCTION_TUPLES); + basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, HiveSortMergeRule.INSTANCE, + HiveSortProjectTransposeRule.INSTANCE, HiveSortJoinReduceRule.INSTANCE, + HiveSortUnionReduceRule.INSTANCE); + basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, HepMatchOrder.BOTTOM_UP, + new HiveSortRemoveRule(reductionProportion, reductionTuples), + HiveProjectSortTransposeRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Push down limit through outer join"); + } + + // 5. Push Down Semi Joins + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, SemiJoinJoinTransposeRule.INSTANCE, + SemiJoinFilterTransposeRule.INSTANCE, SemiJoinProjectTransposeRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Push Down Semi Joins"); + + // 6. Apply Partition Pruning + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, executorProvider, new HivePartitionPruneRule(conf)); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Partition Pruning"); + + // 7. Projection Pruning (this introduces select above TS & hence needs to be run last due to PP) + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + HiveRelFieldTrimmer fieldTrimmer = new HiveRelFieldTrimmer(null, + HiveRelFactories.HIVE_BUILDER.create(cluster, null), + profilesCBO.contains(ExtendedCBOProfile.JOIN_REORDERING)); + basePlan = fieldTrimmer.trim(basePlan); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Projection Pruning"); + + // 8. Merge, remove and reduce Project if possible + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, false, mdProvider, executorProvider, + HiveProjectMergeRule.INSTANCE, ProjectRemoveRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Merge Project-Project"); + + // 9. Rerun PPD through Project as column pruning would have introduced + // DT above scans; By pushing filter just above TS, Hive can push it into + // storage (incase there are filters on non partition cols). This only + // matches FIL-PROJ-TS + perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER); + basePlan = hepPlan(basePlan, true, mdProvider, executorProvider, + HiveFilterProjectTSTransposeRule.INSTANCE, HiveFilterProjectTSTransposeRule.INSTANCE_DRUID, + HiveProjectFilterPullUpConstantsRule.INSTANCE); + perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, + "Calcite: Prejoin ordering transformation, Rerun PPD"); + + return basePlan; + } + + /** + * Run the HEP Planner with the given rule set. + * + * @param basePlan + * @param followPlanChanges + * @param mdProvider + * @param executorProvider + * @param rules + * @return optimized RelNode + */ + private RelNode hepPlan(RelNode basePlan, boolean followPlanChanges, + RelMetadataProvider mdProvider, RexExecutor executorProvider, RelOptRule... rules) { + return hepPlan(basePlan, followPlanChanges, mdProvider, executorProvider, + HepMatchOrder.TOP_DOWN, rules); + } + + /** + * Run the HEP Planner with the given rule set. + * + * @param basePlan + * @param followPlanChanges + * @param mdProvider + * @param executorProvider + * @param order + * @param rules + * @return optimized RelNode + */ + private RelNode hepPlan(RelNode basePlan, boolean followPlanChanges, + RelMetadataProvider mdProvider, RexExecutor executorProvider, HepMatchOrder order, + RelOptRule... rules) { + + RelNode optimizedRelNode = basePlan; + HepProgramBuilder programBuilder = new HepProgramBuilder(); + if (followPlanChanges) { + programBuilder.addMatchOrder(order); + programBuilder = programBuilder.addRuleCollection(ImmutableList.copyOf(rules)); + } else { + // TODO: Should this be also TOP_DOWN? + for (RelOptRule r : rules) + programBuilder.addRuleInstance(r); + } + + // Create planner and copy context + HepPlanner planner = new HepPlanner(programBuilder.build(), + basePlan.getCluster().getPlanner().getContext()); + + List<RelMetadataProvider> list = Lists.newArrayList(); + list.add(mdProvider); + planner.registerMetadataProviders(list); + RelMetadataProvider chainedProvider = ChainedRelMetadataProvider.of(list); + basePlan.getCluster().setMetadataProvider( + new CachingRelMetadataProvider(chainedProvider, planner)); + + if (executorProvider != null) { + // basePlan.getCluster.getPlanner is the VolcanoPlanner from apply() + // both planners need to use the correct executor + basePlan.getCluster().getPlanner().setExecutor(executorProvider); + planner.setExecutor(executorProvider); + } + + planner.setRoot(basePlan); + optimizedRelNode = planner.findBestExp(); + + return optimizedRelNode; + } + + @SuppressWarnings("nls") + private RelNode genSetOpLogicalPlan(Opcode opcode, String alias, String leftalias, RelNode leftRel, + String rightalias, RelNode rightRel) throws SemanticException { + // 1. Get Row Resolvers, Column map for original left and right input of + // SetOp Rel + RowResolver leftRR = this.relToHiveRR.get(leftRel); + RowResolver rightRR = this.relToHiveRR.get(rightRel); + HashMap<String, ColumnInfo> leftmap = leftRR.getFieldMap(leftalias); + HashMap<String, ColumnInfo> rightmap = rightRR.getFieldMap(rightalias); + + // 2. Validate that SetOp is feasible according to Hive (by using type + // info from RR) + if (leftmap.size() != rightmap.size()) { + throw new SemanticException("Schema of both sides of union should match."); + } + + ASTNode tabref = getQB().getAliases().isEmpty() ? null : getQB().getParseInfo() + .getSrcForAlias(getQB().getAliases().get(0)); + + // 3. construct SetOp Output RR using original left & right Input + RowResolver setOpOutRR = new RowResolver(); + + Iterator<Map.Entry<String, ColumnInfo>> lIter = leftmap.entrySet().iterator(); + Iterator<Map.Entry<String, ColumnInfo>> rIter = rightmap.entrySet().iterator(); + while (lIter.hasNext()) { + Map.Entry<String, ColumnInfo> lEntry = lIter.next(); + Map.Entry<String, ColumnInfo> rEntry = rIter.next(); + ColumnInfo lInfo = lEntry.getValue(); + ColumnInfo rInfo = rEntry.getValue(); + + String field = lEntry.getKey(); + // try widening conversion, otherwise fail union + TypeInfo commonTypeInfo = FunctionRegistry.getCommonClassForUnionAll(lInfo.getType(), + rInfo.getType()); + if (commonTypeInfo == null) { + throw new SemanticException(generateErrorMessage(tabref, + "Schema of both sides of setop should match: Column " + field + + " is of type " + lInfo.getType().getTypeName() + + " on first table and type " + rInfo.getType().getTypeName() + + " on second table")); + } + ColumnInfo setOpColInfo = new ColumnInfo(lInfo); + setOpColInfo.setType(commonTypeInfo); + setOpOutRR.put(alias, field, setOpColInfo); + } + + // 4. Determine which columns requires cast on left/right input (Calcite + // requires exact types on both sides of SetOp) + boolean leftNeedsTypeCast = false; + boolean rightNeedsTypeCast = false; + List<RexNode> leftProjs = new ArrayList<RexNode>(); + List<RexNode> rightProjs = new ArrayList<RexNode>(); + List<RelDataTypeField> leftRowDT = leftRel.getRowType().getFieldList(); + List<RelDataTypeField> rightRowDT = rightRel.getRowType().getFieldList(); + + RelDataType leftFieldDT; + RelDataType rightFieldDT; + RelDataType unionFieldDT; + for (int i = 0; i < leftRowDT.size(); i++) { + leftFieldDT = leftRowDT.get(i).getType(); + rightFieldDT = rightRowDT.get(i).getType(); + if (!leftFieldDT.equals(rightFieldDT)) { + unionFieldDT = TypeConverter.convert(setOpOutRR.getColumnInfos().get(i).getType(), + cluster.getTypeFactory()); + if (!unionFieldDT.equals(leftFieldDT)) { + leftNeedsTypeCast = true; + } + leftProjs.add(cluster.getRexBuilder().ensureType(unionFieldDT, + cluster.getRexBuilder().makeInputRef(leftFieldDT, i), true)); + + if (!unionFieldDT.equals(rightFieldDT)) { + rightNeedsTypeCast = true; + } + rightProjs.add(cluster.getRexBuilder().ensureType(unionFieldDT, + cluster.getRexBuilder().makeInputRef(rightFieldDT, i), true)); + } else { + leftProjs.add(cluster.getRexBuilder().ensureType(leftFieldDT, + cluster.getRexBuilder().makeInputRef(leftFieldDT, i), true)); + rightProjs.add(cluster.getRexBuilder().ensureType(rightFieldDT, + cluster.getRexBuilder().makeInputRef(rightFieldDT, i), true)); + } + } + + // 5. Introduce Project Rel above original left/right inputs if cast is + // needed for type parity + RelNode setOpLeftInput = leftRel; + RelNode setOpRightInput = rightRel; + if (leftNeedsTypeCast) { + setOpLeftInput = HiveProject.create(leftRel, leftProjs, leftRel.getRowType() + .getFieldNames()); + } + if (rightNeedsTypeCast) { + setOpRightInput = HiveProject.create(rightRel, rightProjs, rightRel.getRowType() + .getFieldNames()); + } + + // 6. Construct SetOp Rel + Builder<RelNode> bldr = new ImmutableList.Builder<RelNode>(); + bldr.add(setOpLeftInput); + bldr.add(setOpRightInput); + SetOp setOpRel = null; + switch (opcode) { + case UNION: + setOpRel = new HiveUnion(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build()); + break; + case INTERSECT: + setOpRel = new HiveIntersect(cluster, TraitsUtil.getDefaultTraitSet(cluster), bldr.build(), + false); +
<TRUNCATED>