HIVE-17626: Query reoptimization using cached runtime statistics (Zoltan Haindrich reviewed by Ashutosh Chauhan)
Signed-off-by: Zoltan Haindrich <k...@rxd.hu> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/31e36f01 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/31e36f01 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/31e36f01 Branch: refs/heads/master Commit: 31e36f01952e7bcc803060351758798f9af9e0b3 Parents: cd8eda8 Author: Zoltan Haindrich <k...@rxd.hu> Authored: Wed Mar 7 08:44:14 2018 +0100 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Wed Mar 7 08:44:14 2018 +0100 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 35 +- data/conf/llap/hive-site.xml | 5 + .../druid/serde/DruidScanQueryRecordReader.java | 2 +- .../test/resources/testconfiguration.properties | 3 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 2 +- .../apache/hadoop/hive/ql/TestQTestUtil.java | 2 +- .../java/org/apache/hadoop/hive/ql/Context.java | 46 ++- .../java/org/apache/hadoop/hive/ql/Driver.java | 55 ++- .../apache/hadoop/hive/ql/DriverFactory.java | 51 ++- .../org/apache/hadoop/hive/ql/HookRunner.java | 5 + .../java/org/apache/hadoop/hive/ql/IDriver.java | 7 + .../hive/ql/cache/results/CacheUsage.java | 2 +- .../ql/cache/results/QueryResultsCache.java | 2 +- .../hadoop/hive/ql/exec/FunctionRegistry.java | 1 + .../hive/ql/exec/MaterializedViewDesc.java | 2 +- .../hive/ql/exec/MaterializedViewTask.java | 2 +- .../apache/hadoop/hive/ql/exec/Operator.java | 54 ++- .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 21 +- .../hive/ql/exec/tez/HiveInputCounters.java | 2 +- .../hive/ql/exec/tez/LlapObjectSubCache.java | 2 +- .../VectorReduceSinkCommonOperator.java | 15 +- .../VectorReduceSinkEmptyKeyOperator.java | 38 -- .../VectorReduceSinkObjectHashOperator.java | 28 -- .../VectorReduceSinkUniformHashOperator.java | 35 -- .../hive/ql/hooks/PrivateHookContext.java | 50 +++ .../hadoop/hive/ql/metadata/HiveException.java | 1 + .../hive/ql/optimizer/SharedWorkOptimizer.java | 3 + .../HiveRelOpMaterializationValidator.java | 2 +- .../hive/ql/optimizer/physical/Vectorizer.java | 32 +- .../ql/optimizer/signature/OpSignature.java | 94 +++++ .../ql/optimizer/signature/OpTreeSignature.java | 90 +++++ .../signature/OpTreeSignatureFactory.java | 67 ++++ .../hive/ql/optimizer/signature/Signature.java | 36 ++ .../ql/optimizer/signature/SignatureUtils.java | 95 +++++ .../spark/SparkPartitionPruningSinkDesc.java | 3 + .../stats/annotation/StatsRulesProcFactory.java | 61 ++- .../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 1 + .../apache/hadoop/hive/ql/parse/HiveParser.g | 13 +- .../hadoop/hive/ql/parse/IdentifiersParser.g | 1 + .../hive/ql/plan/AbstractOperatorDesc.java | 12 +- .../hadoop/hive/ql/plan/AppMasterEventDesc.java | 7 +- .../hive/ql/plan/CommonMergeJoinDesc.java | 3 + .../hive/ql/plan/DynamicPruningEventDesc.java | 5 + .../hadoop/hive/ql/plan/FileSinkDesc.java | 18 + .../apache/hadoop/hive/ql/plan/FilterDesc.java | 5 + .../apache/hadoop/hive/ql/plan/GroupByDesc.java | 13 +- .../hadoop/hive/ql/plan/HashTableSinkDesc.java | 5 + .../hadoop/hive/ql/plan/JoinCondDesc.java | 6 +- .../apache/hadoop/hive/ql/plan/JoinDesc.java | 8 + .../hive/ql/plan/LateralViewJoinDesc.java | 3 + .../apache/hadoop/hive/ql/plan/LimitDesc.java | 3 + .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 6 + .../hadoop/hive/ql/plan/OperatorDesc.java | 3 + .../hadoop/hive/ql/plan/ReduceSinkDesc.java | 13 +- .../apache/hadoop/hive/ql/plan/ScriptDesc.java | 8 +- .../apache/hadoop/hive/ql/plan/SelectDesc.java | 5 + .../apache/hadoop/hive/ql/plan/Statistics.java | 27 +- .../hadoop/hive/ql/plan/TableScanDesc.java | 19 + .../apache/hadoop/hive/ql/plan/UDTFDesc.java | 7 +- .../hive/ql/plan/mapper/EmptyStatsSource.java | 28 ++ .../hive/ql/plan/mapper/GroupTransformer.java | 25 ++ .../hadoop/hive/ql/plan/mapper/PlanMapper.java | 112 ++++++ .../hive/ql/plan/mapper/PlanMapperProcess.java | 47 +++ .../hive/ql/plan/mapper/RuntimeStatsSource.java | 29 ++ .../plan/mapper/SimpleRuntimeStatsSource.java | 65 ++++ .../hadoop/hive/ql/plan/mapper/StatsSource.java | 25 ++ .../hive/ql/reexec/IReExecutionPlugin.java | 64 ++++ .../hadoop/hive/ql/reexec/ReExecDriver.java | 263 +++++++++++++ .../ql/reexec/ReExecutionOverlayPlugin.java | 83 ++++ .../hadoop/hive/ql/reexec/ReOptimizePlugin.java | 138 +++++++ .../hadoop/hive/ql/stats/OperatorStats.java | 43 +++ .../hive/ql/stats/OperatorStatsReaderHook.java | 96 +++++ .../ql/udf/generic/GenericUDFAssertTrueOOM.java | 74 ++++ .../GenericUDFEnforceNotNullConstraint.java | 2 +- .../exec/vector/TestVectorGroupByOperator.java | 6 +- .../optimizer/signature/TestOpSigFactory.java | 120 ++++++ .../signature/TestOperatorSignature.java | 159 ++++++++ .../ql/plan/mapping/TestCounterMapping.java | 194 ++++++++++ .../hive/ql/plan/mapping/TestOperatorCmp.java | 186 +++++++++ .../ql/plan/mapping/TestReOptimization.java | 188 +++++++++ .../apache/hive/testutils/HiveTestEnvSetup.java | 2 +- .../hive/testutils/TestHiveTestEnvSetup.java | 2 +- .../queries/clientnegative/bad_exec_hooks.q | 1 + ql/src/test/queries/clientpositive/hook_order.q | 2 + .../test/queries/clientpositive/retry_failure.q | 11 + .../queries/clientpositive/retry_failure_oom.q | 12 + .../clientpositive/retry_failure_stat_changes.q | 29 ++ .../clientpositive/llap/dp_counter_mm.q.out | 110 ++++++ .../clientpositive/llap/dp_counter_non_mm.q.out | 110 ++++++ .../clientpositive/llap/orc_llap_counters.q.out | 310 +++++++++++++++ .../llap/orc_llap_counters1.q.out | 16 + .../clientpositive/llap/orc_ppd_basic.q.out | 380 +++++++++++++++++++ .../llap/orc_ppd_schema_evol_3a.q.out | 380 +++++++++++++++++++ .../clientpositive/llap/retry_failure.q.out | 43 +++ .../clientpositive/llap/retry_failure_oom.q.out | 43 +++ .../llap/retry_failure_stat_changes.q.out | 280 ++++++++++++++ .../llap/tez_input_counters.q.out | 47 +++ .../results/clientpositive/show_functions.q.out | 1 + .../hive/metastore/hooks/URIResolverHook.java | 2 +- .../messaging/CreateDatabaseMessage.java | 2 +- 100 files changed, 4618 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d955b48..fb926eb 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1879,7 +1879,6 @@ public class HiveConf extends Configuration { "in the number of rows filtered by a certain operator, which in turn might lead to overprovision or\n" + "underprovision of resources. This factor is applied to the cardinality estimation of IN clauses in\n" + "filter operators."), - // Concurrency HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false, "Whether Hive supports concurrency control or not. \n" + @@ -3691,22 +3690,29 @@ public class HiveConf extends Configuration { HIVE_CONF_INTERNAL_VARIABLE_LIST("hive.conf.internal.variable.list", "hive.added.files.path,hive.added.jars.path,hive.added.archives.path", "Comma separated list of variables which are used internally and should not be configurable."), - HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list", SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," + SPARK_CLIENT_FUTURE_TIMEOUT.varname, "Comma separated list of variables which are related to remote spark context.\n" + "Changing these variables will result in re-creating the spark session."), - HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s", new TimeValidator(TimeUnit.SECONDS), "Timeout for Running Query in seconds. A nonpositive value means infinite. " + "If the query timeout is also set by thrift API call, the smaller one will be taken."), - - HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new SizeValidator(0L, true, 1024L, true), "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."), + HIVE_QUERY_REEXECUTION_ENABLED("hive.query.reexecution.enabled", true, + "Enable query reexecutions"), + HIVE_QUERY_REEXECUTION_STRATEGIES("hive.query.reexecution.strategies", "overlay,reoptimize", + "comma separated list of plugin can be used:\n" + + " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n" + + " reoptimize: collects operator statistics during execution and recompile the query after a failure"), + HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1, + "Maximum number of re-executions for a single query."), + HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false, + "Used during testing"), + HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + "to be reused if the same query is executed again."), @@ -5090,4 +5096,23 @@ public class HiveConf extends Configuration { return reverseMap; } } + + public void verifyAndSetAll(Map<String, String> overlay) { + for (Entry<String, String> entry : overlay.entrySet()) { + verifyAndSet(entry.getKey(), entry.getValue()); + } + } + + public Map<String, String> subtree(String string) { + Map<String, String> ret = new HashMap<>(); + for (Entry<Object, Object> entry : getProps().entrySet()) { + String key = (String) entry.getKey(); + String value = (String) entry.getValue(); + if (key.startsWith(string)) { + ret.put(key.substring(string.length() + 1), value); + } + } + return ret; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/data/conf/llap/hive-site.xml ---------------------------------------------------------------------- diff --git a/data/conf/llap/hive-site.xml b/data/conf/llap/hive-site.xml index c4c299c..990b473 100644 --- a/data/conf/llap/hive-site.xml +++ b/data/conf/llap/hive-site.xml @@ -353,4 +353,9 @@ <value>false</value> </property> +<property> + <name>tez.counters.max</name> + <value>1024</value> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java index cbeac2c..68ac88c 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index e8aa827..084d5db 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -516,6 +516,9 @@ minillaplocal.query.files=\ bucketmapjoin6.q,\ bucketmapjoin7.q,\ bucketpruning1.q,\ + retry_failure.q,\ + retry_failure_stat_changes.q,\ + retry_failure_oom.q,\ bucketsortoptimize_insert_2.q,\ cbo_gby.q,\ cbo_join.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index d6acce7..f0f23ca 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -1827,7 +1827,7 @@ public class QTestUtil { ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"writeid\":[1-9][0-9]*,\"bucketid\":"), "{\"writeid\":### Masked writeid ###,\"bucketid\":")); - ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9]+"), "attempt_#ID#")); + ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9_]+"), "attempt_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("task_[0-9_]+"), "task_#ID#")); partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQTestUtil.java b/itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQTestUtil.java index c01d87b..1a8eb33 100644 --- a/itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQTestUtil.java +++ b/itests/util/src/test/java/org/apache/hadoop/hive/ql/TestQTestUtil.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index dba2dbb..58fa5f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -58,6 +59,10 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; +import org.apache.hadoop.hive.ql.plan.mapper.EmptyStatsSource; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; +import org.apache.hadoop.hive.ql.plan.mapper.RuntimeStatsSource; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.shims.ShimLoader; @@ -65,6 +70,7 @@ import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for * each query should call clear() at end of use to remove temporary folders @@ -153,6 +159,11 @@ public class Context { private Operation operation = Operation.OTHER; private WmContext wmContext; + private boolean isExplainPlan = false; + private PlanMapper planMapper = new PlanMapper(); + private RuntimeStatsSource runtimeStatsSource; + private int executionIndex; + public void setOperation(Operation operation) { this.operation = operation; } @@ -229,7 +240,7 @@ public class Context { } if(!thisIsInASubquery) { throw new IllegalStateException("Expected '" + getMatchedText(curNode) + "' to be in sub-query or set operation."); - } + } return DestClausePrefix.INSERT; } switch (operation) { @@ -252,7 +263,7 @@ public class Context { assert insert != null && insert.getType() == HiveParser.TOK_INSERT; ASTNode query = (ASTNode) insert.getParent(); assert query != null && query.getType() == HiveParser.TOK_QUERY; - + for(int childIdx = 1; childIdx < query.getChildCount(); childIdx++) {//1st child is TOK_FROM assert query.getChild(childIdx).getType() == HiveParser.TOK_INSERT; if(insert == query.getChild(childIdx)) { @@ -997,7 +1008,7 @@ public class Context { public ExplainConfiguration getExplainConfig() { return explainConfig; } - private boolean isExplainPlan = false; + public boolean isExplainPlan() { return isExplainPlan; } @@ -1033,4 +1044,33 @@ public class Context { public String getExecutionId() { return executionId; } + + public PlanMapper getPlanMapper() { + return planMapper; + } + + public void setRuntimeStatsSource(RuntimeStatsSource runtimeStatsSource) { + this.runtimeStatsSource = runtimeStatsSource; + } + + public Optional<RuntimeStatsSource> getRuntimeStatsSource() { + return Optional.ofNullable(runtimeStatsSource); + } + + public StatsSource getStatsSource() { + if (runtimeStatsSource != null) { + return runtimeStatsSource; + } else { + // hierarchical; add def stats also here + return new EmptyStatsSource(); + } + } + + public int getExecutionIndex() { + return executionIndex; + } + + public void setExecutionIndex(int executionIndex) { + this.executionIndex = executionIndex; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 6999777..d789ed0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookUtils; +import org.apache.hadoop.hive.ql.hooks.PrivateHookContext; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -109,6 +110,8 @@ import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; +import org.apache.hadoop.hive.ql.plan.mapper.RuntimeStatsSource; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -191,6 +194,7 @@ public class Driver implements IDriver { // Transaction manager used for the query. This will be set at compile time based on // either initTxnMgr or from the SessionState, in that order. private HiveTxnManager queryTxnMgr; + private RuntimeStatsSource runtimeStatsSource; private CacheUsage cacheUsage; private CacheEntry usedCacheEntry; @@ -282,6 +286,15 @@ public class Driver implements IDriver { return schema; } + @Override + public Context getContext() { + return ctx; + } + + public PlanMapper getPlanMapper() { + return ctx.getPlanMapper(); + } + /** * Get a Schema with fields represented with native Hive types */ @@ -557,6 +570,7 @@ public class Driver implements IDriver { setTriggerContext(queryId); } + ctx.setRuntimeStatsSource(runtimeStatsSource); ctx.setCmd(command); ctx.setHDFSCleanup(true); @@ -579,7 +593,6 @@ public class Driver implements IDriver { hookRunner.runBeforeCompileHook(command); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE); - BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); // Flush the metastore cache. This assures that we don't pick up objects from a previous // query running in this same thread. This has to be done after we get our semantic @@ -587,15 +600,7 @@ public class Driver implements IDriver { // because at that point we need access to the objects. Hive.get().getMSC().flushCache(); - if(checkConcurrency() && startImplicitTxn(queryTxnMgr)) { - String userFromUGI = getUserFromUGI(); - if (!queryTxnMgr.isTxnOpen()) { - if(userFromUGI == null) { - throw createProcessorResponse(10); - } - long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); - } - } + BaseSemanticAnalyzer sem; // Do semantic analysis and plan generation if (hookRunner.hasPreAnalyzeHooks()) { HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl(); @@ -606,12 +611,15 @@ public class Driver implements IDriver { hookCtx.setHiveOperation(queryState.getHiveOperation()); tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree); - + sem = SemanticAnalyzerFactory.get(queryState, tree); + openTransaction(); sem.analyze(tree, ctx); hookCtx.update(sem); hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks()); } else { + sem = SemanticAnalyzerFactory.get(queryState, tree); + openTransaction(); sem.analyze(tree, ctx); } LOG.info("Semantic Analysis Completed"); @@ -749,6 +757,18 @@ public class Driver implements IDriver { ctx.setWmContext(wmContext); } + private void openTransaction() throws LockException, CommandProcessorResponse { + if (checkConcurrency() && startImplicitTxn(queryTxnMgr)) { + String userFromUGI = getUserFromUGI(); + if (!queryTxnMgr.isTxnOpen()) { + if (userFromUGI == null) { + throw createProcessorResponse(10); + } + long txnid = queryTxnMgr.openTxn(ctx, userFromUGI); + } + } + } + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 @@ -1272,7 +1292,9 @@ public class Driver implements IDriver { return; } } - if (!AcidUtils.isTransactionalTable(tbl)) return; + if (!AcidUtils.isTransactionalTable(tbl)) { + return; + } String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); tableList.add(fullTableName); } @@ -1922,9 +1944,9 @@ public class Driver implements IDriver { SessionState ss = SessionState.get(); - hookContext = new HookContext(plan, queryState, ctx.getPathToCS(), SessionState.get().getUserName(), + hookContext = new PrivateHookContext(plan, queryState, ctx.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, - ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo); + ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo, ctx); hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK); hookRunner.runPreHooks(hookContext); @@ -2591,4 +2613,9 @@ public class Driver implements IDriver { public HookRunner getHookRunner() { return hookRunner; } + + public void setRuntimeStatsSource(RuntimeStatsSource runtimeStatsSource) { + this.runtimeStatsSource = runtimeStatsSource; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java index 60e8de8..0f6a80e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java @@ -18,31 +18,54 @@ package org.apache.hadoop.hive.ql; +import java.util.ArrayList; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin; +import org.apache.hadoop.hive.ql.reexec.ReExecDriver; +import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin; +import org.apache.hadoop.hive.ql.reexec.ReOptimizePlugin; + +import com.google.common.base.Strings; /** - * Constructs a driver for ql clients + * Constructs a driver for ql clients. */ public class DriverFactory { - enum ExecutionStrategy { - none { - @Override - IDriver build(QueryState queryState, String userName, QueryInfo queryInfo) { - return new Driver(queryState, userName, queryInfo); - } - }; - - abstract IDriver build(QueryState queryState, String userName, QueryInfo queryInfo); - } - public static IDriver newDriver(HiveConf conf) { return newDriver(getNewQueryState(conf), null, null); } public static IDriver newDriver(QueryState queryState, String userName, QueryInfo queryInfo) { - ExecutionStrategy strategy = ExecutionStrategy.none; - return strategy.build(queryState, userName, queryInfo); + boolean enabled = queryState.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED); + if (!enabled) { + return new Driver(queryState, userName, queryInfo); + } + + String strategies = queryState.getConf().getVar(ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES); + strategies = Strings.nullToEmpty(strategies).trim().toLowerCase(); + ArrayList<IReExecutionPlugin> plugins = new ArrayList<>(); + for (String string : strategies.split(",")) { + if (string.trim().isEmpty()) { + continue; + } + plugins.add(buildReExecPlugin(string)); + } + + return new ReExecDriver(queryState, userName, queryInfo, plugins); + } + + private static IReExecutionPlugin buildReExecPlugin(String name) throws RuntimeException { + if (name.equals("overlay")) { + return new ReExecutionOverlayPlugin(); + } + if (name.equals("reoptimize")) { + return new ReOptimizePlugin(); + } + throw new RuntimeException( + "Unknown re-execution plugin: " + name + " (" + ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES.varname + ")"); } private static QueryState getNewQueryState(HiveConf conf) { http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/HookRunner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/HookRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/HookRunner.java index 2a32a51..a3105b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/HookRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/HookRunner.java @@ -318,4 +318,9 @@ public class HookRunner { public void addOnFailureHook(ExecuteWithHookContext hook) { onFailureHooks.add(hook); } + + public void addSemanticAnalyzerHook(HiveSemanticAnalyzerHook hook) { + saHooks.add(hook); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java index 9f13fa8..41737fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hive.ql; import java.io.IOException; import java.util.List; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.FetchTask; @@ -30,6 +32,8 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; /** * Hive query executer driver */ +@InterfaceAudience.Private +@InterfaceStability.Unstable public interface IDriver extends CommandProcessor { int compile(String string); @@ -66,4 +70,7 @@ public interface IDriver extends CommandProcessor { void destroy(); HiveConf getConf(); + + Context getContext(); + } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java index 08b791a..bb217a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/CacheUsage.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 131127e..88a056b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java index 77e9263..d59bf1f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java @@ -467,6 +467,7 @@ public final class FunctionRegistry { system.registerGenericUDF("array", GenericUDFArray.class); system.registerGenericUDF("assert_true", GenericUDFAssertTrue.class); + system.registerGenericUDF("assert_true_oom", GenericUDFAssertTrueOOM.class); system.registerGenericUDF("map", GenericUDFMap.class); system.registerGenericUDF("struct", GenericUDFStruct.class); system.registerGenericUDF("named_struct", GenericUDFNamedStruct.class); http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewDesc.java index 1e28ca8..f7af073 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewDesc.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java index 2b345d6..de120af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MaterializedViewTask.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 199b181..c28ef99 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -84,7 +84,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected final AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; - protected transient long runTimeNumRows; + protected transient long numRows = 0; + protected transient long runTimeNumRows = 0; protected int indexForTezUnion = -1; private transient Configuration hconf; protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>(); @@ -108,6 +109,14 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C // one of its parent is not in state CLOSE.. } + /** + * Counters. + */ + public enum Counter { + RECORDS_OUT_OPERATOR, + RECORDS_OUT_INTERMEDIATE + } + protected transient State state = State.UNINIT; private boolean useBucketizedHiveInputFormat; @@ -224,7 +233,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C @SuppressWarnings("rawtypes") protected transient OutputCollector out; protected transient final Logger LOG = LoggerFactory.getLogger(getClass().getName()); - protected transient final Logger PLOG = LoggerFactory.getLogger(Operator.class.getName()); // for simple disabling logs from all operators protected transient String alias; protected transient Reporter reporter; protected String id; @@ -317,6 +325,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C @SuppressWarnings("unchecked") public final void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException { + // String className = this.getClass().getName(); this.done = false; @@ -490,6 +499,14 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C rootInitializeCalled = true; } + public String getCounterName(Counter counter, Configuration hconf) { + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ", "_"); + } + return counter + context; + } + /** * Calls initialize on each of the children with outputObjetInspector as the * output row format. @@ -708,6 +725,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C if (conf != null && conf.getRuntimeStatsTmpDir() != null) { publishRunTimeStats(); } + LongWritable runTimeRowsWritable = new LongWritable(runTimeNumRows); + LongWritable recordCounter = new LongWritable(numRows); + statsMap.put(Counter.RECORDS_OUT_OPERATOR.name() + "_" + getOperatorId(), runTimeRowsWritable); + statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter); this.runTimeNumRows = 0; reporter = null; @@ -970,12 +991,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C } } - public void resetStats() { - for (String e : statsMap.keySet()) { - statsMap.get(e).set(0L); - } - } - public void reset(){ this.state=State.INIT; if (childOperators != null) { @@ -1562,4 +1577,27 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C (conf == other.getConf() || (conf != null && other.getConf() != null && conf.isSame(other.getConf()))); } + + /** + * Compares the whole operator tree with the other. + */ + // Currently only used during re-optimization related parts. + // FIXME: HIVE-18703 should probably move this method somewhere else + public final boolean logicalEqualsTree(Operator<?> o) { + // XXX: this could easily become a hot-spot + if (!logicalEquals(o)) { + return false; + } + if (o.getNumParent() != getNumParent()) { + return false; + } + for (int i = 0; i < getNumParent(); i++) { + Operator<? extends OperatorDesc> copL = parentOperators.get(i); + Operator<? extends OperatorDesc> copR = o.parentOperators.get(i); + if (!copL.logicalEquals(copR)) { + return false; + } + } + return true; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 395a5f4..d4363fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -31,7 +31,6 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -50,7 +49,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; @@ -62,13 +60,6 @@ import org.apache.hadoop.util.hash.MurmurHash; public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> implements Serializable, TopNHash.BinaryCollector { - /** - * Counters. - */ - public static enum Counter { - RECORDS_OUT_INTERMEDIATE - } - private static final long serialVersionUID = 1L; private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance(); @@ -140,10 +131,8 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> // TODO: we only ever use one row of these at a time. Why do we need to cache multiple? protected transient Object[][] cachedKeys; - protected transient long numRows = 0; protected transient long cntr = 1; protected transient long logEveryNRows = 0; - private final transient LongWritable recordCounter = new LongWritable(); /** Kryo ctor. */ protected ReduceSinkOperator() { @@ -163,9 +152,6 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - final String vertexName = hconf.get(Operator.CONTEXT_NAME_KEY, ""); - statsMap.put(Utilities.getVertexCounterName(Counter.RECORDS_OUT_INTERMEDIATE.name(), vertexName), recordCounter); - List<ExprNodeDesc> keys = conf.getKeyCols(); if (LOG.isDebugEnabled()) { @@ -363,7 +349,10 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> // if TopNHashes are active, proceed if not already excluded (i.e order by limit) final int firstIndex = (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD; - if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. + if (firstIndex == TopNHash.EXCLUDE) + { + return; // Nothing to do. + } // Compute value and hashcode - we'd either store or forward them. BytesWritable value = makeValueWritable(row); @@ -531,6 +520,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> if (!abort && reducerHash != null) { reducerHash.flush(); } + runTimeNumRows = numRows; super.closeOp(abort); out = null; random = null; @@ -538,7 +528,6 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> if (LOG.isTraceEnabled()) { LOG.info(toString() + ": records written - " + numRows); } - recordCounter.set(numRows); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveInputCounters.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveInputCounters.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveInputCounters.java index 085d6a7..3100c33 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveInputCounters.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveInputCounters.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java index 0d31e6e..7c5857c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapObjectSubCache.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index 8dd7cfe..cc876d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; import org.apache.hadoop.hive.ql.exec.TerminalOperator; import org.apache.hadoop.hive.ql.exec.TopNHash; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -36,9 +35,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -56,10 +53,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerialize import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; @@ -136,10 +130,8 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re // Where to write our key and value pairs. private transient OutputCollector out; - private transient long numRows = 0; private transient long cntr = 1; private transient long logEveryNRows = 0; - private final transient LongWritable recordCounter = new LongWritable(); // For debug tracing: the name of the map or reduce task. protected transient String taskName; @@ -274,7 +266,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re if (context != null && !context.isEmpty()) { context = "_" + context.replace(" ","_"); } - statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter); reduceSkipTag = conf.getSkipTag(); reduceTagByte = (byte) conf.getTag(); @@ -355,7 +346,9 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re final int firstIndex = reducerHash.tryStoreKey(keyWritable, /* partColsIsNull */ false); - if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do. + if (firstIndex == TopNHash.EXCLUDE) { + return; // Nothing to do. + } if (firstIndex == TopNHash.FORWARD) { doCollect(keyWritable, valueWritable); @@ -399,6 +392,7 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re if (!abort && reducerHash != null) { reducerHash.flush(); } + runTimeNumRows = numRows; super.closeOp(abort); out = null; reducerHash = null; @@ -406,7 +400,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re LOG.info(toString() + ": records written - " + numRows); } this.runTimeNumRows = numRows; - recordCounter.set(numRows); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java index 134fc0f..78e64d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkEmptyKeyOperator.java @@ -18,54 +18,16 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Random; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; -import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; -import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java index 1eb72ce..1bc3fda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java @@ -18,54 +18,26 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; -import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java index 384bd74..c98663c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java @@ -18,53 +18,18 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Random; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter; -import org.apache.hadoop.hive.ql.exec.TerminalOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized; -import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream.Output; -import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; -import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; -import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.OutputCollector; import org.apache.hive.common.util.HashCodeUtil; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/hooks/PrivateHookContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PrivateHookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PrivateHookContext.java new file mode 100644 index 0000000..605436b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PrivateHookContext.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.util.Map; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryInfo; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.log.PerfLogger; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PrivateHookContext extends HookContext { + + private final Context ctx; + + public PrivateHookContext(QueryPlan queryPlan, QueryState queryState, + Map<String, ContentSummary> inputPathToContentSummary, String userName, String ipAddress, + String hiveInstanceAddress, String operationId, String sessionId, String threadId, boolean isHiveServerQuery, + PerfLogger perfLogger, QueryInfo queryInfo, Context ctx) throws Exception { + super(queryPlan, queryState, inputPathToContentSummary, userName, ipAddress, hiveInstanceAddress, operationId, + sessionId, threadId, isHiveServerQuery, perfLogger, queryInfo); + this.ctx = ctx; + } + + public Context getContext() { + return ctx; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java index b758507..bfbbe3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveException.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; @InterfaceAudience.Public @InterfaceStability.Stable public class HiveException extends Exception { + /** * Standard predefined message with error code and possibly SQL State, etc. */ http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index b0cf3bd..bc6d0bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -598,6 +598,7 @@ public class SharedWorkOptimizer extends Transform { new LinkedList<>(tableToTotalSize.entrySet()); Collections.sort(sortedTables, Collections.reverseOrder( new Comparator<Map.Entry<String, Long>>() { + @Override public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) { return (o1.getValue()).compareTo(o2.getValue()); } @@ -637,6 +638,7 @@ public class SharedWorkOptimizer extends Transform { new LinkedList<>(opToTotalSize.entrySet()); Collections.sort(sortedOps, Collections.reverseOrder( new Comparator<Map.Entry<Operator<?>, Long>>() { + @Override public int compare(Map.Entry<Operator<?>, Long> o1, Map.Entry<Operator<?>, Long> o2) { int valCmp = o1.getValue().compareTo(o2.getValue()); if (valCmp == 0) { @@ -648,6 +650,7 @@ public class SharedWorkOptimizer extends Transform { return sortedOps; } + // FIXME: probably this should also be integrated with isSame() logics private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache, TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException { // First we check if the two table scan operators can actually be merged http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java index 8c1bcb3..df216e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/HiveRelOpMaterializationValidator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 783a672..13a2fc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -28,21 +28,17 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Stack; -import java.util.TreeMap; import java.util.TreeSet; import java.util.regex.Pattern; import org.apache.commons.lang.ArrayUtils; -import org.apache.calcite.util.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.slf4j.Logger; @@ -83,16 +79,10 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; -import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedUDAFs; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.HiveVectorAdaptorUsageMode; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType; @@ -105,22 +95,15 @@ import org.apache.hadoop.hive.ql.io.NullRowsInputFormat; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; -import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; -import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; -import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; -import org.apache.hadoop.hive.ql.lib.PreOrderOnceWalker; -import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; @@ -138,11 +121,9 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; -import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.VectorDesc; import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; @@ -179,13 +160,9 @@ import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; -import org.apache.hadoop.hive.ql.plan.ptf.PartitionDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef; import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef; @@ -231,8 +208,6 @@ import org.apache.hadoop.hive.ql.udf.UDFToString; import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear; import org.apache.hadoop.hive.ql.udf.UDFYear; import org.apache.hadoop.hive.ql.udf.generic.*; -import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator; -import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; @@ -256,8 +231,6 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hive.common.util.AnnotationUtils; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; - import com.google.common.collect.ImmutableSet; import com.google.common.base.Preconditions; @@ -1415,7 +1388,9 @@ public class Vectorizer implements PhysicalPlanResolver { return false; } for (Class<?> badClass : excludes) { - if (badClass.isAssignableFrom(ifClass)) return true; + if (badClass.isAssignableFrom(ifClass)) { + return true; + } } return false; } @@ -4792,6 +4767,7 @@ public class Vectorizer implements PhysicalPlanResolver { setOperatorIssue(e.getMessage()); throw new VectorizerCannotVectorizeException(); } + Preconditions.checkState(vectorOp != null); if (vectorTaskColumnInfo != null && !isNative) { vectorTaskColumnInfo.setAllNative(false); http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java new file mode 100644 index 0000000..90b2fd3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpSignature.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.signature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Signature of the operator(non-recursive). + */ +public class OpSignature { + + /** + * Holds the signature of the operator; the keys are are the methods name marked by {@link Signature}. + */ + private Map<String, Object> sigMap; + // FIXME: this is currently retained... + // but later the signature should be able to serve the same comparision granulaty level as op.logicalEquals right now + private Operator<? extends OperatorDesc> op; + + private OpSignature(Operator<? extends OperatorDesc> op) { + this.op = op; + sigMap = new HashMap<>(); + // FIXME: consider to operator info as well..not just conf? + SignatureUtils.write(sigMap, op.getConf()); + } + + public static OpSignature of(Operator<? extends OperatorDesc> op) { + return new OpSignature(op); + } + + @Override + public int hashCode() { + return sigMap.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof OpSignature)) { + return false; + } + if (obj == this) { + return true; + } + OpSignature o = (OpSignature) obj; + return op.logicalEquals(o.op); + } + + public boolean signatureCompare(OpSignature other) { + return sigMap.equals(other.sigMap); + } + + @VisibleForTesting + public void proveEquals(OpSignature other) { + proveEquals(sigMap,other.sigMap); + } + + private static void proveEquals(Map<String, Object> m1, Map<String, Object> m2) { + for (Entry<String, Object> e : m1.entrySet()) { + String key = e.getKey(); + Object v1 = e.getValue(); + Object v2 = m2.get(key); + if (v1 == v2) { + continue; + } + if (v1 == null || v2 == null || !v1.equals(v2)) { + throw new RuntimeException(String.format("equals fails: %s (%s!=%s)", key, v1, v2)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java new file mode 100644 index 0000000..c6d1a6a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignature.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.signature; + +import java.util.ArrayList; +import java.util.Objects; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** + * Operator tree signature. + */ +public class OpTreeSignature { + private Operator<?> op; + private int hashCode; + private OpSignature sig; + private ArrayList<OpTreeSignature> parentSig; + + OpTreeSignature(Operator<?> op, OpTreeSignatureFactory osf) { + this.op = op; + sig = OpSignature.of(op); + parentSig = new ArrayList<>(); + for (Operator<? extends OperatorDesc> parentOp : op.getParentOperators()) { + parentSig.add(osf.getSignature(parentOp)); + } + hashCode = Objects.hash(sig, parentSig); + } + + public static OpTreeSignature of(Operator<?> root) { + return of(root, OpTreeSignatureFactory.DIRECT); + } + + public static OpTreeSignature of(Operator<? extends OperatorDesc> op, OpTreeSignatureFactory osf) { + return new OpTreeSignature(op, osf); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof OpTreeSignature)) { + return false; + } + if (obj == this) { + return true; + } + OpTreeSignature o = (OpTreeSignature) obj; + // TODO: this should be removed as soon as signatures are able to provide the same level of confidentiality as logicalEquals + return logicalEqualsTree(op, o.op); + } + + // XXX: this is ain't cheap! :) + private final boolean logicalEqualsTree(Operator<?> o1, Operator<?> o) { + if (!o1.logicalEquals(o)) { + return false; + } + if (o.getNumParent() != o1.getNumParent()) { + return false; + } + for (int i = 0; i < o1.getNumParent(); i++) { + Operator<? extends OperatorDesc> copL = o1.getParentOperators().get(i); + Operator<? extends OperatorDesc> copR = o.getParentOperators().get(i); + if (!copL.logicalEquals(copR)) { + return false; + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java new file mode 100644 index 0000000..3df5ee9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/OpTreeSignatureFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.signature; + +import java.util.IdentityHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; + +/** + * A simple cache backend to prevent repeated signature computations. + */ +public interface OpTreeSignatureFactory { + + public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op); + + static final OpTreeSignatureFactory DIRECT = new Direct(); + + public static OpTreeSignatureFactory direct() { + return DIRECT; + } + + public static OpTreeSignatureFactory newCache() { + return new CachedFactory(); + } + + // FIXME: possible alternative: move both OpSignature/OpTreeSignature into + // under some class as nested ones; and that way this factory level caching can be made "transparent" + + static class Direct implements OpTreeSignatureFactory { + + @Override + public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op) { + return OpTreeSignature.of(op, this); + } + + } + + static class CachedFactory implements OpTreeSignatureFactory { + + Map<Operator<? extends OperatorDesc>, OpTreeSignature> cache = new IdentityHashMap<>(); + + @Override + public OpTreeSignature getSignature(Operator<? extends OperatorDesc> op) { + return cache.computeIfAbsent(op, k -> OpTreeSignature.of(op, this)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/Signature.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/Signature.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/Signature.java new file mode 100644 index 0000000..c228a8e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/Signature.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.signature; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks the method to be included in the signature. + * + * The signature is used to enable logical level tree comparisions between operator trees. + */ +@Target(value = ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface Signature { + + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java new file mode 100644 index 0000000..2269322 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/signature/SignatureUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.signature; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Sets; + +/** + * Enables to calculate the signature of an object. + * + * If the object has methods annotated with {@link Signature}, they will be used. + * If the object has no methods marked with the annotation; the object itself is used in the signature to prevent incorrect matches. + */ +public class SignatureUtils { + + private static Map<Class<?>, SignatureMapper> mappers = new HashMap<>(); + + public static void write(Map<String, Object> ret, Object o) { + SignatureMapper mapper = getSigMapper(o.getClass()); + mapper.write(ret, o); + } + + static class SignatureMapper { + + static final Set<String> acceptedSignatureTypes = Sets.newHashSet(); + + private List<Method> sigMethods; + + public SignatureMapper(Class<?> o) { + Method[] f = o.getMethods(); + sigMethods = new ArrayList<>(); + for (Method method : f) { + if (method.isAnnotationPresent(Signature.class)) { + Class<?> rType = method.getReturnType(); + String rTypeName = rType.getName(); + if (!rType.isPrimitive() && acceptedSignatureTypes.contains(rTypeName)) { + throw new RuntimeException("unxepected type (" + rTypeName + ") used in signature"); + } + sigMethods.add(method); + } + } + } + + public void write(Map<String, Object> ret, Object o) { + if (sigMethods.isEmpty()) { + // by supplying using "o" this enforces identity/equls matching + // which will most probably make the signature very unique + ret.put(o.getClass().getName(), o); + } else { + ret.put(o.getClass().getName(), "1"); + for (Method method : sigMethods) { + try { + Object res = method.invoke(o); + ret.put(method.getName(), res); + } catch (Exception e) { + throw new RuntimeException("Error invoking signature method", e); + } + } + } + } + + } + + private static SignatureMapper getSigMapper(Class<?> o) { + SignatureMapper m = mappers.get(o); + if (m == null) { + m = new SignatureMapper(o); + mappers.put(o, m); + } + return m; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/31e36f01/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java index d1c53cf..3f928b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; +import org.apache.hadoop.hive.ql.optimizer.signature.Signature; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -134,6 +135,7 @@ public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc { info -> info.columnName + " (" + info.columnType + ")").toArray()); } + @Signature public TableDesc getTable() { return table; } @@ -156,4 +158,5 @@ public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc { } return false; } + }