Repository: hive Updated Branches: refs/heads/branch-3 a39b24660 -> d0769c573
HIVE-19009 : Retain and use runtime statistics during hs2 lifetime (Zoltan Haindrich via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> (cherry picked from commit 9f15e22f4aea99891a37aa1e54d490921e6e1174) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d0769c57 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d0769c57 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d0769c57 Branch: refs/heads/branch-3 Commit: d0769c573b482fe440fbd4dd3c68d43f9c8f9524 Parents: a39b246 Author: Zoltan Haindrich <k...@rxd.hu> Authored: Tue Apr 3 08:51:00 2018 -0700 Committer: Zoltan Haindrich <k...@rxd.hu> Committed: Fri Apr 20 16:12:54 2018 +0200 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 +- .../test/resources/testconfiguration.properties | 1 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 3 + .../java/org/apache/hadoop/hive/ql/Context.java | 12 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 4 + .../hive/ql/optimizer/physical/Vectorizer.java | 12 +- .../apache/hadoop/hive/ql/plan/JoinDesc.java | 2 +- .../hive/ql/plan/mapper/CachingStatsSource.java | 68 +++++++++ .../hive/ql/plan/mapper/EmptyStatsSource.java | 11 ++ .../plan/mapper/SimpleRuntimeStatsSource.java | 6 + .../hadoop/hive/ql/plan/mapper/StatsSource.java | 5 +- .../hive/ql/plan/mapper/StatsSources.java | 122 ++++++++++++++++ .../hive/ql/reexec/IReExecutionPlugin.java | 1 + .../hadoop/hive/ql/reexec/ReExecDriver.java | 20 ++- .../ql/reexec/ReExecutionOverlayPlugin.java | 4 + .../hadoop/hive/ql/reexec/ReOptimizePlugin.java | 48 +++++-- .../signature/TestOperatorSignature.java | 9 +- .../ql/plan/mapping/TestCounterMapping.java | 1 - .../queries/clientpositive/runtime_stats_hs2.q | 22 +++ .../clientpositive/llap/runtime_stats_hs2.q.out | 141 +++++++++++++++++++ 20 files changed, 476 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7dd16e3..607f8ba 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4257,10 +4257,19 @@ public class HiveConf extends Configuration { "comma separated list of plugin can be used:\n" + " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n" + " reoptimize: collects operator statistics during execution and recompile the query after a failure"), + HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope", "query", + new StringSet("query", "hiveserver", "metastore"), + "Sets the persistence scope of runtime statistics\n" + + " query: runtime statistics are only used during re-execution\n" + + " hiveserver: runtime statistics are persisted in the hiveserver - all sessions share it"), + HIVE_QUERY_MAX_REEXECUTION_COUNT("hive.query.reexecution.max.count", 1, "Maximum number of re-executions for a single query."), HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS("hive.query.reexecution.always.collect.operator.stats", false, - "Used during testing"), + "If sessionstats are enabled; this option can be used to collect statistics all the time"), + HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE("hive.query.reexecution.stats.cache.size", 100_000, + "Size of the runtime statistics cache. Unit is: OperatorStat entry; a query plan consist ~100"), + HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true, "If the query results cache is enabled. This will keep results of previously executed queries " + http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 183dc4c..d7c5877 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -514,6 +514,7 @@ minillaplocal.query.files=\ retry_failure.q,\ retry_failure_stat_changes.q,\ retry_failure_oom.q,\ + runtime_stats_hs2.q,\ bucketsortoptimize_insert_2.q,\ check_constraint.q,\ cbo_gby.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 2eb4889..a386284 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -98,6 +98,7 @@ import org.apache.hadoop.hive.ql.parse.ParseDriver; import org.apache.hadoop.hive.ql.parse.ParseException; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSources; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -1071,8 +1072,10 @@ public class QTestUtil { clearTablesCreatedDuringTests(); clearUDFsCreatedDuringTests(); clearKeysCreatedInTests(); + StatsSources.clearAllStats(); } + protected void initConfFromSetup() throws Exception { setup.preTest(conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 9ca8b00..70846ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -159,7 +159,7 @@ public class Context { private boolean isExplainPlan = false; private PlanMapper planMapper = new PlanMapper(); - private StatsSource runtimeStatsSource; + private StatsSource statsSource; private int executionIndex; public void setOperation(Operation operation) { @@ -1047,16 +1047,16 @@ public class Context { return planMapper; } - public void setStatsSource(StatsSource runtimeStatsSource) { - this.runtimeStatsSource = runtimeStatsSource; + public void setStatsSource(StatsSource statsSource) { + this.statsSource = statsSource; } public StatsSource getStatsSource() { - if (runtimeStatsSource != null) { - return runtimeStatsSource; + if (statsSource != null) { + return statsSource; } else { // hierarchical; add def stats also here - return new EmptyStatsSource(); + return EmptyStatsSource.INSTANCE; } } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4acdd9b..a630b6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2745,6 +2745,10 @@ public class Driver implements IDriver { this.statsSource = runtimeStatsSource; } + public StatsSource getStatsSource() { + return statsSource; + } + @Override public boolean hasResultSet() { http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index e15c5b7..068f25e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -161,6 +161,7 @@ import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef; import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef; @@ -375,6 +376,8 @@ public class Vectorizer implements PhysicalPlanResolver { private Set<VirtualColumn> availableVectorizedVirtualColumnSet = null; private Set<VirtualColumn> neededVirtualColumnSet = null; + private PlanMapper planMapper; + public class VectorizerCannotVectorizeException extends Exception { } @@ -867,7 +870,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private void runDelayedFixups() { - for (Entry<Operator<? extends OperatorDesc>, Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>>> delayed + for (Entry<Operator<? extends OperatorDesc>, Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>>> delayed : delayedFixups.entrySet()) { Operator<? extends OperatorDesc> key = delayed.getKey(); Set<ImmutablePair<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>> value = @@ -1470,7 +1473,7 @@ public class Vectorizer implements PhysicalPlanResolver { enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); } } - + return false; } @@ -2247,6 +2250,7 @@ public class Vectorizer implements PhysicalPlanResolver { public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException { hiveConf = physicalContext.getConf(); + planMapper = physicalContext.getContext().getPlanMapper(); String vectorizationEnabledOverrideString = HiveConf.getVar(hiveConf, @@ -2776,7 +2780,7 @@ public class Vectorizer implements PhysicalPlanResolver { } if (exprNodeDescList != null) { ExprNodeDesc exprNodeDesc = exprNodeDescList.get(0); - + if (containsLeadLag(exprNodeDesc)) { setOperatorIssue("lead and lag function not supported in argument expression of aggregation function " + functionName); return false; @@ -5019,6 +5023,8 @@ public class Vectorizer implements PhysicalPlanResolver { LOG.debug("vectorizeOperator " + vectorOp.getClass().getName()); LOG.debug("vectorizeOperator " + vectorOp.getConf().getClass().getName()); + // These operators need to be linked to enable runtime statistics to be gathered/used correctly + planMapper.link(op, vectorOp); return vectorOp; } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java index 5b7f4c3..e7ca7f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java @@ -556,10 +556,10 @@ public class JoinDesc extends AbstractOperatorDesc { } protected Map<Integer, String> toCompactString(int[][] filterMap) { + filterMap = compactFilter(filterMap); if (filterMap == null) { return null; } - filterMap = compactFilter(filterMap); Map<Integer, String> result = new LinkedHashMap<Integer, String>(); for (int i = 0 ; i < filterMap.length; i++) { if (filterMap[i] == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java new file mode 100644 index 0000000..c515276 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/CachingStatsSource.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; +import org.apache.hadoop.hive.ql.stats.OperatorStats; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +public class CachingStatsSource implements StatsSource { + + + private final Cache<OpTreeSignature, OperatorStats> cache; + + public CachingStatsSource(HiveConf conf) { + int size = conf.getIntVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_CACHE_SIZE); + cache = CacheBuilder.newBuilder().maximumSize(size).build(); + } + + public void put(OpTreeSignature sig, OperatorStats opStat) { + cache.put(sig, opStat); + } + + @Override + public Optional<OperatorStats> lookup(OpTreeSignature treeSig) { + return Optional.ofNullable(cache.getIfPresent(treeSig)); + } + + @Override + public boolean canProvideStatsFor(Class<?> clazz) { + if (Operator.class.isAssignableFrom(clazz)) { + return true; + } + return false; + } + + @Override + public void putAll(Map<OpTreeSignature, OperatorStats> map) { + for (Entry<OpTreeSignature, OperatorStats> entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java index 72092ce..19df13a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/EmptyStatsSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan.mapper; +import java.util.Map; import java.util.Optional; import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; @@ -25,6 +26,11 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats; public class EmptyStatsSource implements StatsSource { + public static StatsSource INSTANCE = new EmptyStatsSource(); + + private EmptyStatsSource() { + } + @Override public boolean canProvideStatsFor(Class<?> class1) { return false; @@ -35,4 +41,9 @@ public class EmptyStatsSource implements StatsSource { return Optional.empty(); } + @Override + public void putAll(Map<OpTreeSignature, OperatorStats> map) { + throw new RuntimeException("This is an empty source!"); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java index b5a3c24..3d6c257 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/SimpleRuntimeStatsSource.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan.mapper; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; @@ -56,4 +57,9 @@ public class SimpleRuntimeStatsSource implements StatsSource { return false; } + @Override + public void putAll(Map<OpTreeSignature, OperatorStats> map) { + throw new RuntimeException(); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java index df5aa0c..e8d51c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan.mapper; +import java.util.Map; import java.util.Optional; import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; @@ -25,8 +26,10 @@ import org.apache.hadoop.hive.ql.stats.OperatorStats; public interface StatsSource { - boolean canProvideStatsFor(Class<?> class1); + boolean canProvideStatsFor(Class<?> clazz); Optional<OperatorStats> lookup(OpTreeSignature treeSig); + void putAll(Map<OpTreeSignature, OperatorStats> map); + } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java new file mode 100644 index 0000000..a4e33c3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/mapper/StatsSources.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan.mapper; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.optimizer.signature.OpTreeSignature; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper.EquivGroup; +import org.apache.hadoop.hive.ql.stats.OperatorStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public class StatsSources { + + public static class MapBackedStatsSource implements StatsSource { + + private Map<OpTreeSignature, OperatorStats> map = new HashMap<>(); + + @Override + public boolean canProvideStatsFor(Class<?> clazz) { + if (Operator.class.isAssignableFrom(clazz)) { + return true; + } + return false; + } + + @Override + public Optional<OperatorStats> lookup(OpTreeSignature treeSig) { + return Optional.ofNullable(map.get(treeSig)); + } + + @Override + public void putAll(Map<OpTreeSignature, OperatorStats> map) { + map.putAll(map); + } + + } + + private static final Logger LOG = LoggerFactory.getLogger(StatsSources.class); + + public static StatsSource getStatsSourceContaining(StatsSource currentStatsSource, PlanMapper pm) { + if (currentStatsSource instanceof CachingStatsSource) { + CachingStatsSource sessionStatsSource = (CachingStatsSource) currentStatsSource; + loadFromPlanMapper(sessionStatsSource, pm); + return sessionStatsSource; + } else { + return new SimpleRuntimeStatsSource(pm); + } + } + + public static void loadFromPlanMapper(CachingStatsSource sessionStatsSource, PlanMapper pm) { + Map<OpTreeSignature, OperatorStats> map = extractStatMapFromPlanMapper(pm); + sessionStatsSource.putAll(map); + } + + + private static Map<OpTreeSignature, OperatorStats> extractStatMapFromPlanMapper(PlanMapper pm) { + Map<OpTreeSignature, OperatorStats> map = new HashMap<OpTreeSignature, OperatorStats>(); + Iterator<EquivGroup> it = pm.iterateGroups(); + while (it.hasNext()) { + EquivGroup e = it.next(); + List<OperatorStats> stat = e.getAll(OperatorStats.class); + List<OpTreeSignature> sig = e.getAll(OpTreeSignature.class); + + if (stat.size() > 1 || sig.size() > 1) { + StringBuffer sb = new StringBuffer(); + sb.append(String.format("expected(stat-sig) 1-1, got {}-{} ;", stat.size(), sig.size())); + for (OperatorStats s : stat) { + sb.append(s); + sb.append(";"); + } + for (OpTreeSignature s : sig) { + sb.append(s); + sb.append(";"); + } + LOG.debug(sb.toString()); + } + if (stat.size() >= 1 && sig.size() >= 1) { + map.put(sig.get(0), stat.get(0)); + } + } + return map; + } + + private static StatsSource globalStatsSource; + + public static StatsSource globalStatsSource(HiveConf conf) { + if (globalStatsSource == null) { + globalStatsSource = new CachingStatsSource(conf); + } + return globalStatsSource; + } + + @VisibleForTesting + public static void clearAllStats() { + globalStatsSource = null; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java index 2b0d23c..be62fc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/IReExecutionPlugin.java @@ -59,6 +59,7 @@ public interface IReExecutionPlugin { */ boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper); + void afterExecute(PlanMapper planMapper, boolean successfull); } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java index 8a5595d..501f0b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java @@ -156,6 +156,9 @@ public class ReExecDriver implements IDriver { LOG.info("Execution #{} of query", executionIndex); CommandProcessorResponse cpr = coreDriver.run(); + PlanMapper oldPlanMapper = coreDriver.getPlanMapper(); + afterExecute(oldPlanMapper, cpr.getResponseCode() == 0); + boolean shouldReExecute = explainReOptimization && executionIndex==1; shouldReExecute |= cpr.getResponseCode() != 0 && shouldReExecute(); @@ -164,25 +167,34 @@ public class ReExecDriver implements IDriver { } LOG.info("Preparing to re-execute query"); prepareToReExecute(); - PlanMapper oldPlanMapper = coreDriver.getPlanMapper(); CommandProcessorResponse compile_resp = coreDriver.compileAndRespond(currentQuery); if (compile_resp.failed()) { + LOG.error("Recompilation of the query failed; this is unexpected."); // FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before? return compile_resp; } PlanMapper newPlanMapper = coreDriver.getPlanMapper(); if (!explainReOptimization && !shouldReExecuteAfterCompile(oldPlanMapper, newPlanMapper)) { + LOG.info("re-running the query would probably not yield better results; returning with last error"); // FIXME: retain old error; or create a new one? return cpr; } } } + private void afterExecute(PlanMapper planMapper, boolean success) { + for (IReExecutionPlugin p : plugins) { + p.afterExecute(planMapper, success); + } + } + private boolean shouldReExecuteAfterCompile(PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { boolean ret = false; for (IReExecutionPlugin p : plugins) { - ret |= p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper); + boolean shouldReExecute = p.shouldReExecute(executionIndex, oldPlanMapper, newPlanMapper); + LOG.debug("{}.shouldReExecuteAfterCompile = {}", p, shouldReExecute); + ret |= shouldReExecute; } return ret; } @@ -190,7 +202,9 @@ public class ReExecDriver implements IDriver { private boolean shouldReExecute() { boolean ret = false; for (IReExecutionPlugin p : plugins) { - ret |= p.shouldReExecute(executionIndex); + boolean shouldReExecute = p.shouldReExecute(executionIndex); + LOG.debug("{}.shouldReExecute = {}", p, shouldReExecute); + ret |= shouldReExecute; } return ret; } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java index 4ee3c14..950903c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionOverlayPlugin.java @@ -80,4 +80,8 @@ public class ReExecutionOverlayPlugin implements IReExecutionPlugin { public void beforeExecute(int executionIndex, boolean explainReOptimization) { } + @Override + public void afterExecute(PlanMapper planMapper, boolean success) { + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java index f731315..409cc73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReOptimizePlugin.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.reexec; import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.Operator; @@ -29,7 +30,8 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; -import org.apache.hadoop.hive.ql.plan.mapper.SimpleRuntimeStatsSource; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSource; +import org.apache.hadoop.hive.ql.plan.mapper.StatsSources; import org.apache.hadoop.hive.ql.stats.OperatorStatsReaderHook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,8 @@ public class ReOptimizePlugin implements IReExecutionPlugin { private OperatorStatsReaderHook statsReaderHook; + private boolean alwaysCollectStats; + class LocalHook implements ExecuteWithHookContext { @Override @@ -62,10 +66,10 @@ public class ReOptimizePlugin implements IReExecutionPlugin { if (message.contains("Vertex failed,") && isOOM) { retryPossible = true; } - System.out.println(exception); } } } + LOG.info("ReOptimization: retryPossible: {}", retryPossible); } } } @@ -77,9 +81,25 @@ public class ReOptimizePlugin implements IReExecutionPlugin { statsReaderHook = new OperatorStatsReaderHook(); coreDriver.getHookRunner().addOnFailureHook(statsReaderHook); coreDriver.getHookRunner().addPostHook(statsReaderHook); - // statsReaderHook.setCollectOnSuccess(true); - statsReaderHook.setCollectOnSuccess( - driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS)); + alwaysCollectStats = driver.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS); + statsReaderHook.setCollectOnSuccess(alwaysCollectStats); + + coreDriver.setStatsSource(getStatsSource(driver.getConf())); + } + + static enum StatsSourceMode { + query, hiveserver; + } + + private StatsSource getStatsSource(HiveConf conf) { + StatsSourceMode mode = StatsSourceMode.valueOf(conf.getVar(ConfVars.HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE)); + switch (mode) { + case query: + return new StatsSources.MapBackedStatsSource(); + case hiveserver: + return StatsSources.globalStatsSource(conf); + } + throw new RuntimeException("Unknown StatsSource setting: " + mode); } @Override @@ -90,17 +110,19 @@ public class ReOptimizePlugin implements IReExecutionPlugin { @Override public void prepareToReExecute() { statsReaderHook.setCollectOnSuccess(true); - PlanMapper pm = coreDriver.getContext().getPlanMapper(); - coreDriver.setStatsSource(new SimpleRuntimeStatsSource(pm)); retryPossible = false; + coreDriver.setStatsSource( + StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), coreDriver.getPlanMapper())); } @Override public boolean shouldReExecute(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { - return planDidChange(oldPlanMapper, newPlanMapper); + boolean planDidChange = !planEquals(oldPlanMapper, newPlanMapper); + LOG.info("planDidChange: {}", planDidChange); + return planDidChange; } - private boolean planDidChange(PlanMapper pmL, PlanMapper pmR) { + private boolean planEquals(PlanMapper pmL, PlanMapper pmR) { List<Operator> opsL = getRootOps(pmL); List<Operator> opsR = getRootOps(pmR); for (Iterator<Operator> itL = opsL.iterator(); itL.hasNext();) { @@ -135,4 +157,12 @@ public class ReOptimizePlugin implements IReExecutionPlugin { } } + @Override + public void afterExecute(PlanMapper planMapper, boolean success) { + if (alwaysCollectStats) { + coreDriver.setStatsSource( + StatsSources.getStatsSourceContaining(coreDriver.getStatsSource(), planMapper)); + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java index 0afc533..b09aafb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/signature/TestOperatorSignature.java @@ -77,12 +77,16 @@ public class TestOperatorSignature { Operator<TableScanDesc> ts = getTsOp(i); Operator<? extends OperatorDesc> fil = getFilterOp(j); - ts.getChildOperators().add(fil); - fil.getParentOperators().add(ts); + connectOperators(ts, fil); return fil; } + private void connectOperators(Operator<?> parent, Operator<?> child) { + parent.getChildOperators().add(child); + child.getParentOperators().add(parent); + } + @Test public void testTableScand() { Operator<TableScanDesc> t1 = getTsOp(3); @@ -157,4 +161,5 @@ public class TestOperatorSignature { } + } http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java index 18aeb33..8126970 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/plan/mapping/TestCounterMapping.java @@ -172,7 +172,6 @@ public class TestCounterMapping { } private static IDriver createDriver() { - // HiveConf conf = new HiveConf(Driver.class); HiveConf conf = env_setup.getTestCtx().hiveConf; conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED, true); conf.setBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ALWAYS_COLLECT_OPERATOR_STATS, true); http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/test/queries/clientpositive/runtime_stats_hs2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/runtime_stats_hs2.q b/ql/src/test/queries/clientpositive/runtime_stats_hs2.q new file mode 100644 index 0000000..34a8dd3 --- /dev/null +++ b/ql/src/test/queries/clientpositive/runtime_stats_hs2.q @@ -0,0 +1,22 @@ + +create table tx(a int,u int); +insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10); + +create table px(a int,p int); +insert into px values (2,2),(3,3),(5,5),(7,7),(11,11); + +set hive.explain.user=true; +set hive.query.reexecution.enabled=true; +set hive.query.reexecution.always.collect.operator.stats=true; +set hive.query.reexecution.strategies=overlay,reoptimize; +set hive.query.reexecution.stats.persist.scope=hiveserver; + +-- join output estimate is underestimated: 1 row +explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2; + +select sum(u*p) from tx join px on (u=p) where u<10 and p>2; + +-- join output estimate is 3 rows ; all the operators stats are "runtime" +explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2; http://git-wip-us.apache.org/repos/asf/hive/blob/d0769c57/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out b/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out new file mode 100644 index 0000000..4d60b8c --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/runtime_stats_hs2.q.out @@ -0,0 +1,141 @@ +PREHOOK: query: create table tx(a int,u int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tx +POSTHOOK: query: create table tx(a int,u int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tx +PREHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tx +POSTHOOK: query: insert into tx values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(10,10) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tx +POSTHOOK: Lineage: tx.a SCRIPT [] +POSTHOOK: Lineage: tx.u SCRIPT [] +PREHOOK: query: create table px(a int,p int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@px +POSTHOOK: query: create table px(a int,p int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@px +PREHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@px +POSTHOOK: query: insert into px values (2,2),(3,3),(5,5),(7,7),(11,11) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@px +POSTHOOK: Lineage: px.a SCRIPT [] +POSTHOOK: Lineage: px.p SCRIPT [] +PREHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 3 llap + File Output Operator [FS_15] + Group By Operator [GBY_13] (rows=1 width=8) + Output:["_col0"],aggregations:["sum(VALUE._col0)"] + <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap + PARTITION_ONLY_SHUFFLE [RS_12] + Group By Operator [GBY_11] (rows=1 width=8) + Output:["_col0"],aggregations:["sum(_col0)"] + Select Operator [SEL_9] (rows=1 width=8) + Output:["_col0"] + Merge Join Operator [MERGEJOIN_20] (rows=1 width=8) + Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"] + <-Map 1 [SIMPLE_EDGE] llap + SHUFFLE [RS_6] + PartitionCols:_col0 + Select Operator [SEL_2] (rows=1 width=4) + Output:["_col0"] + Filter Operator [FIL_18] (rows=1 width=4) + predicate:((u < 10) and (u > 2)) + TableScan [TS_0] (rows=8 width=4) + default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"] + <-Map 4 [SIMPLE_EDGE] llap + SHUFFLE [RS_7] + PartitionCols:_col0 + Select Operator [SEL_5] (rows=1 width=4) + Output:["_col0"] + Filter Operator [FIL_19] (rows=1 width=4) + predicate:((p < 10) and (p > 2)) + TableScan [TS_3] (rows=5 width=4) + default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"] + +PREHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +PREHOOK: type: QUERY +PREHOOK: Input: default@px +PREHOOK: Input: default@tx +#### A masked pattern was here #### +POSTHOOK: query: select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@px +POSTHOOK: Input: default@tx +#### A masked pattern was here #### +83 +PREHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select sum(u*p) from tx join px on (u=p) where u<10 and p>2 +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) +Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 3 llap + File Output Operator [FS_15] + Group By Operator [GBY_13] (runtime: rows=1 width=8) + Output:["_col0"],aggregations:["sum(VALUE._col0)"] + <-Reducer 2 [CUSTOM_SIMPLE_EDGE] llap + PARTITION_ONLY_SHUFFLE [RS_12] + Group By Operator [GBY_11] (runtime: rows=1 width=8) + Output:["_col0"],aggregations:["sum(_col0)"] + Select Operator [SEL_9] (runtime: rows=3 width=8) + Output:["_col0"] + Merge Join Operator [MERGEJOIN_20] (runtime: rows=3 width=8) + Conds:RS_6._col0=RS_7._col0(Inner),Output:["_col0","_col1"] + <-Map 1 [SIMPLE_EDGE] llap + SHUFFLE [RS_6] + PartitionCols:_col0 + Select Operator [SEL_2] (runtime: rows=5 width=4) + Output:["_col0"] + Filter Operator [FIL_18] (runtime: rows=5 width=4) + predicate:((u < 10) and (u > 2)) + TableScan [TS_0] (runtime: rows=8 width=4) + default@tx,tx,Tbl:COMPLETE,Col:COMPLETE,Output:["u"] + <-Map 4 [SIMPLE_EDGE] llap + SHUFFLE [RS_7] + PartitionCols:_col0 + Select Operator [SEL_5] (runtime: rows=3 width=4) + Output:["_col0"] + Filter Operator [FIL_19] (runtime: rows=3 width=4) + predicate:((p < 10) and (p > 2)) + TableScan [TS_3] (runtime: rows=5 width=4) + default@px,px,Tbl:COMPLETE,Col:COMPLETE,Output:["p"] +