http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 1688524..3cf8b71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -35,7 +35,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -46,6 +48,9 @@ import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -77,6 +82,9 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected final AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; + protected transient long runTimeNumRows; + protected int indexForTezUnion = -1; + private transient Configuration hconf; protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>(); // It can be optimized later so that an operator operator (init/close) is performed @@ -476,7 +484,9 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C * Operator specific initialization. */ protected void initializeOp(Configuration hconf) throws HiveException { + this.hconf = hconf; rootInitializeCalled = true; + runTimeNumRows = 0; } /** @@ -715,6 +725,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C * should overwrite this funtion for their specific cleanup routine. */ protected void closeOp(boolean abort) throws HiveException { + if (conf != null && conf.getRuntimeStatsTmpDir() != null) { + publishRunTimeStats(); + } + runTimeNumRows = 0; } private boolean jobCloseDone = false; @@ -869,7 +883,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected void forward(Object row, ObjectInspector rowInspector) throws HiveException { - + runTimeNumRows++; if (getDone()) { return; } @@ -1425,4 +1439,38 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C public CompilationOpContext getCompilationOpContext() { return cContext; } + + private void publishRunTimeStats() throws HiveException { + StatsPublisher statsPublisher = new FSStatsPublisher(); + StatsCollectionContext sContext = new StatsCollectionContext(hconf); + sContext.setIndexForTezUnion(indexForTezUnion); + sContext.setStatsTmpDir(conf.getRuntimeStatsTmpDir()); + + if (!statsPublisher.connect(sContext)) { + LOG.error("StatsPublishing error: cannot connect to database"); + throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg()); + } + + String prefix = ""; + Map<String, String> statsToPublish = new HashMap<String, String>(); + statsToPublish.put(StatsSetupConst.RUN_TIME_ROW_COUNT, Long.toString(runTimeNumRows)); + if (!statsPublisher.publishStat(prefix, statsToPublish)) { + // The original exception is lost. + // Not changing the interface to maintain backward compatibility + throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg()); + } + if (!statsPublisher.closeConnection(sContext)) { + // The original exception is lost. + // Not changing the interface to maintain backward compatibility + throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg()); + } + } + + public int getIndexForTezUnion() { + return indexForTezUnion; + } + + public void setIndexForTezUnion(int indexForTezUnion) { + this.indexForTezUnion = indexForTezUnion; + } }
http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index fa23b01..c4490f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -548,6 +548,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> // forward is not called if (null != out) { numRows++; + runTimeNumRows++; if (isLogInfoEnabled) { if (numRows == cntr) { cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows; http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 277683e..247d589 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -546,6 +546,28 @@ public class SerializationUtilities { return result; } + public static List<Operator<?>> cloneOperatorTree(List<Operator<?>> roots, int indexForTezUnion) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + CompilationOpContext ctx = roots.isEmpty() ? null : roots.get(0).getCompilationOpContext(); + serializePlan(roots, baos, true); + @SuppressWarnings("unchecked") + List<Operator<?>> result = + deserializePlan(new ByteArrayInputStream(baos.toByteArray()), + roots.getClass(), true); + // Restore the context. + LinkedList<Operator<?>> newOps = new LinkedList<>(result); + while (!newOps.isEmpty()) { + Operator<?> newOp = newOps.poll(); + newOp.setIndexForTezUnion(indexForTezUnion); + newOp.setCompilationOpContext(ctx); + List<Operator<?>> children = newOp.getChildOperators(); + if (children != null) { + newOps.addAll(children); + } + } + return result; + } + /** * Clones using the powers of XML. Do not use unless necessary. * @param plan The plan. http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 602ed1a..483fc3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.StatsWork; @@ -87,7 +88,9 @@ public class StatsTask extends Task<StatsWork> implements Serializable { @Override public int execute(DriverContext driverContext) { - + if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) { + return 0; + } LOG.info("Executing stats task"); // Make sure that it is either an ANALYZE, INSERT OVERWRITE (maybe load) or CTAS command short workComponentsPresent = 0; @@ -147,7 +150,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) { try { scc = getContext(); - statsAggregator = createStatsAggregator(scc); + statsAggregator = createStatsAggregator(scc, conf); } catch (HiveException e) { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { throw e; @@ -294,7 +297,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { return prefix; } - private StatsAggregator createStatsAggregator(StatsCollectionContext scc) throws HiveException { + private StatsAggregator createStatsAggregator(StatsCollectionContext scc, HiveConf conf) throws HiveException { String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); StatsFactory factory = StatsFactory.newFactory(statsImpl, conf); if (factory == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 6afe957..0f02222 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -267,6 +267,7 @@ public class TableScanOperator extends Operator<TableScanDesc> implements publishStats(); } } + super.closeOp(abort); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java index a75b52a..0d1fa31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java @@ -150,5 +150,6 @@ public class UDTFOperator extends Operator<UDTFDesc> implements Serializable { @Override protected void closeOp(boolean abort) throws HiveException { conf.getGenericUDTF().close(); + super.closeOp(abort); } } http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 5b8afb5..2dd03f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; @@ -137,98 +138,111 @@ public class ATSHook implements ExecuteWithHookContext { @Override public void run(final HookContext hookContext) throws Exception { - final long currentTime = System.currentTimeMillis(); - final HiveConf conf = new HiveConf(hookContext.getConf()); - final QueryState queryState = hookContext.getQueryState(); - final String queryId = queryState.getQueryId(); - - final Map<String, Long> durations = new HashMap<String, Long>(); - for (String key : hookContext.getPerfLogger().getEndTimes().keySet()) { - durations.put(key, hookContext.getPerfLogger().getDuration(key)); - } - - try { - setupAtsExecutor(conf); + final long currentTime = System.currentTimeMillis(); + final HiveConf conf = new HiveConf(hookContext.getConf()); + final QueryState queryState = hookContext.getQueryState(); + final String queryId = queryState.getQueryId(); + + final Map<String, Long> durations = new HashMap<String, Long>(); + for (String key : hookContext.getPerfLogger().getEndTimes().keySet()) { + durations.put(key, hookContext.getPerfLogger().getDuration(key)); + } - executor.submit(new Runnable() { - @Override - public void run() { - try { - QueryPlan plan = hookContext.getQueryPlan(); - if (plan == null) { - return; - } - String queryId = plan.getQueryId(); - String opId = hookContext.getOperationId(); - long queryStartTime = plan.getQueryStartTime(); - String user = hookContext.getUgi().getShortUserName(); - String requestuser = hookContext.getUserName(); - if (hookContext.getUserName() == null ){ - requestuser = hookContext.getUgi().getUserName() ; - } - int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); - int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); - if (numMrJobs + numTezJobs <= 0) { - return; // ignore client only queries - } - - switch(hookContext.getHookType()) { - case PRE_EXEC_HOOK: - ExplainWork work = new ExplainWork(null,// resFile - null,// pCtx - plan.getRootTasks(),// RootTasks - plan.getFetchTask(),// FetchTask - null,// analyzer - false,// extended - true,// formatted - false,// dependency - false,// logical - false,// authorize - false,// userLevelExplain - null// cboInfo - ); - @SuppressWarnings("unchecked") - ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf); - explain.initialize(queryState, plan, null, null); - String query = plan.getQueryStr(); - JSONObject explainPlan = explain.getJSONPlan(null, work); - String logID = conf.getLogIdVar(hookContext.getSessionId()); - List<String> tablesRead = getTablesFromEntitySet(hookContext.getInputs()); - List<String> tablesWritten = getTablesFromEntitySet(hookContext.getOutputs()); - String executionMode = getExecutionMode(plan).name(); - String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); - if (hiveInstanceAddress == null) { - hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); - } - String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI"; - ApplicationId llapId = determineLlapId(conf, plan); - fireAndForget( - createPreHookEvent(queryId, query, explainPlan, queryStartTime, - user, requestuser, numMrJobs, numTezJobs, opId, - hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, - hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode, - tablesRead, tablesWritten, conf)); - break; - case POST_EXEC_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, durations)); - break; - case ON_FAILURE_HOOK: - fireAndForget(createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, durations)); - break; - default: - //ignore - break; - } - } catch (Exception e) { - LOG.warn("Failed to submit plan to ATS for " + queryId, e); - } + try { + setupAtsExecutor(conf); + + executor.submit(new Runnable() { + @Override + public void run() { + try { + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + return; + } + String queryId = plan.getQueryId(); + String opId = hookContext.getOperationId(); + long queryStartTime = plan.getQueryStartTime(); + String user = hookContext.getUgi().getUserName(); + String requestuser = hookContext.getUserName(); + if (hookContext.getUserName() == null) { + requestuser = hookContext.getUgi().getUserName(); + } + int numMrJobs = Utilities.getMRTasks( + plan.getRootTasks()).size(); + int numTezJobs = Utilities.getTezTasks( + plan.getRootTasks()).size(); + if (numMrJobs + numTezJobs <= 0) { + return; // ignore client only queries + } + + switch (hookContext.getHookType()) { + case PRE_EXEC_HOOK: + ExplainConfiguration config = new ExplainConfiguration(); + config.setFormatted(true); + ExplainWork work = new ExplainWork(null,// resFile + null,// pCtx + plan.getRootTasks(),// RootTasks + plan.getFetchTask(),// FetchTask + null,// analyzer + config, // explainConfig + null// cboInfo + ); + @SuppressWarnings("unchecked") + ExplainTask explain = (ExplainTask) TaskFactory + .get(work, conf); + explain.initialize(queryState, plan, null, null); + String query = plan.getQueryStr(); + JSONObject explainPlan = explain.getJSONPlan(null, + work); + String logID = conf.getLogIdVar(hookContext.getSessionId()); + List<String> tablesRead = getTablesFromEntitySet(hookContext + .getInputs()); + List<String> tablesWritten = getTablesFromEntitySet(hookContext + .getOutputs()); + String executionMode = getExecutionMode(plan) + .name(); + String hiveInstanceAddress = hookContext + .getHiveInstanceAddress(); + if (hiveInstanceAddress == null) { + hiveInstanceAddress = InetAddress + .getLocalHost().getHostAddress(); + } + String hiveInstanceType = hookContext + .isHiveServerQuery() ? "HS2" : "CLI"; + ApplicationId llapId = determineLlapId(conf, plan); + fireAndForget(createPreHookEvent(queryId, query, + explainPlan, queryStartTime, user, + requestuser, numMrJobs, numTezJobs, opId, + hookContext.getIpAddress(), + hiveInstanceAddress, hiveInstanceType, + hookContext.getSessionId(), + logID, + hookContext.getThreadId(), executionMode, + tablesRead, tablesWritten, conf)); + break; + case POST_EXEC_HOOK: + fireAndForget(createPostHookEvent(queryId, + currentTime, user, requestuser, true, opId, + durations)); + break; + case ON_FAILURE_HOOK: + fireAndForget(createPostHookEvent(queryId, + currentTime, user, requestuser, false, + opId, durations)); + break; + default: + // ignore + break; + } + } catch (Exception e) { + LOG.warn("Failed to submit to ATS for " + queryId, e); + } + } + }); + } catch (Exception e) { + LOG.warn("Failed to submit to ATS for " + queryId, e); } - }); - } catch (Exception e) { - LOG.warn("Failed to submit to ATS for " + queryId, e); - } } - protected List<String> getTablesFromEntitySet(Set<? extends Entity> entities) { List<String> tableNames = new ArrayList<String>(); for (Entity entity : entities) { http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 8c44933..c6287e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -219,7 +219,7 @@ public class Optimizer { if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) { transformations.add(new StatsOptimizer()); } - if (pctx.getContext().getExplain() && !isTezExecEngine && !isSparkExecEngine) { + if (pctx.getContext().isExplainSkipExecution() && !isTezExecEngine && !isSparkExecEngine) { transformations.add(new AnnotateWithStatistics()); transformations.add(new AnnotateWithOpTraits()); } @@ -231,6 +231,7 @@ public class Optimizer { if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEFETCHTASKAGGR)) { transformations.add(new SimpleFetchAggregation()); } + } /** http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java new file mode 100644 index 0000000..ee67443 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; +import org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher; + +public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver { + private static final Logger LOG = LoggerFactory.getLogger(AnnotateRunTimeStatsOptimizer.class); + + private class AnnotateRunTimeStatsDispatcher implements Dispatcher { + + private final PhysicalContext physicalContext; + + public AnnotateRunTimeStatsDispatcher(PhysicalContext context, Map<Rule, NodeProcessor> rules) { + super(); + physicalContext = context; + } + + @Override + public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) + throws SemanticException { + Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; + Set<Operator<? extends OperatorDesc>> ops = new HashSet<>(); + + if (currTask instanceof MapRedTask) { + MapRedTask mr = (MapRedTask) currTask; + ops.addAll(mr.getWork().getAllOperators()); + } else if (currTask instanceof TezTask) { + TezWork work = ((TezTask) currTask).getWork(); + for (BaseWork w : work.getAllWork()) { + ops.addAll(w.getAllOperators()); + } + } else if (currTask instanceof SparkTask) { + SparkWork sparkWork = (SparkWork) currTask.getWork(); + for (BaseWork w : sparkWork.getAllWork()) { + ops.addAll(w.getAllOperators()); + } + } + + setOrAnnotateStats(ops, physicalContext.getParseContext()); + return null; + } + + } + + public static void setOrAnnotateStats(Set<Operator<? extends OperatorDesc>> ops, ParseContext pctx) + throws SemanticException { + for (Operator<? extends OperatorDesc> op : ops) { + if (pctx.getContext().getExplainAnalyze() == AnalyzeState.RUNNING) { + setRuntimeStatsDir(op, pctx); + } else if (pctx.getContext().getExplainAnalyze() == AnalyzeState.ANALYZING) { + annotateRuntimeStats(op, pctx); + } else { + throw new SemanticException("Unexpected stats in AnnotateWithRunTimeStatistics."); + } + } + } + + private static void setRuntimeStatsDir(Operator<? extends OperatorDesc> op, ParseContext pctx) + throws SemanticException { + try { + OperatorDesc conf = op.getConf(); + if (conf != null) { + LOG.info("setRuntimeStatsDir for " + op.getOperatorId()); + String path = new Path(pctx.getContext().getExplainConfig().getExplainRootPath(), + op.getOperatorId()).toString(); + StatsPublisher statsPublisher = new FSStatsPublisher(); + StatsCollectionContext runtimeStatsContext = new StatsCollectionContext(pctx.getConf()); + runtimeStatsContext.setStatsTmpDir(path); + if (!statsPublisher.init(runtimeStatsContext)) { + LOG.error("StatsPublishing error: StatsPublisher is not initialized."); + throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg()); + } + conf.setRuntimeStatsTmpDir(path); + } else { + LOG.debug("skip setRuntimeStatsDir for " + op.getOperatorId() + + " because OperatorDesc is null"); + } + } catch (HiveException e) { + throw new SemanticException(e); + } + } + + private static void annotateRuntimeStats(Operator<? extends OperatorDesc> op, ParseContext pctx) { + Long runTimeNumRows = pctx.getContext().getExplainConfig().getOpIdToRuntimeNumRows() + .get(op.getOperatorId()); + if (op.getConf() != null && op.getConf().getStatistics() != null && runTimeNumRows != null) { + LOG.info("annotateRuntimeStats for " + op.getOperatorId()); + op.getConf().getStatistics().setRunTimeNumRows(runTimeNumRows); + } else { + LOG.debug("skip annotateRuntimeStats for " + op.getOperatorId()); + } + } + + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); + Dispatcher disp = new AnnotateRunTimeStatsDispatcher(pctx, opRules); + GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList<Node> topNodes = new ArrayList<Node>(); + topNodes.addAll(pctx.getRootTasks()); + ogw.startWalking(topNodes, null); + return pctx; + } + + public void resolve(Set<Operator<?>> opSet, ParseContext pctx) throws SemanticException { + Set<Operator<?>> ops = getAllOperatorsForSimpleFetch(opSet); + setOrAnnotateStats(ops, pctx); + } + + private Set<Operator<?>> getAllOperatorsForSimpleFetch(Set<Operator<?>> opSet) { + Set<Operator<?>> returnSet = new LinkedHashSet<Operator<?>>(); + Stack<Operator<?>> opStack = new Stack<Operator<?>>(); + // add all children + opStack.addAll(opSet); + while (!opStack.empty()) { + Operator<?> op = opStack.pop(); + returnSet.add(op); + if (op.getChildOperators() != null) { + opStack.addAll(op.getChildOperators()); + } + } + return returnSet; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 49706b1..9377563 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -88,12 +88,17 @@ public class PhysicalOptimizer { // Vectorization should be the last optimization, because it doesn't modify the plan // or any operators. It makes a very low level transformation to the expressions to // run in the vectorized mode. - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) { + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) + && pctx.getContext().getExplainAnalyze() == null) { resolvers.add(new Vectorizer()); } if (!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE))) { resolvers.add(new StageIDsRearranger()); } + + if (pctx.getContext().getExplainAnalyze() != null) { + resolvers.add(new AnnotateRunTimeStatsOptimizer()); + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java index c3e8bf5..80e62c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java @@ -67,10 +67,11 @@ public class ColumnStatsAutoGatherContext { private boolean isInsertInto; private Table tbl; private Map<String, String> partSpec; + private Context origCtx; public ColumnStatsAutoGatherContext(SemanticAnalyzer sa, HiveConf conf, Operator<? extends OperatorDesc> op, Table tbl, Map<String, String> partSpec, - boolean isInsertInto) throws SemanticException { + boolean isInsertInto, Context ctx) throws SemanticException { super(); this.sa = sa; this.conf = conf; @@ -78,6 +79,7 @@ public class ColumnStatsAutoGatherContext { this.tbl = tbl; this.partSpec = partSpec; this.isInsertInto = isInsertInto; + this.origCtx = ctx; columns = tbl.getCols(); partitionColumns = tbl.getPartCols(); } @@ -107,7 +109,7 @@ public class ColumnStatsAutoGatherContext { // 2. Based on the statement, generate the selectOperator Operator<?> selOp = null; try { - selOp = genSelOpForAnalyze(analyzeCommand); + selOp = genSelOpForAnalyze(analyzeCommand, origCtx); } catch (IOException | ParseException e) { throw new SemanticException(e); } @@ -126,10 +128,13 @@ public class ColumnStatsAutoGatherContext { } @SuppressWarnings("rawtypes") - private Operator genSelOpForAnalyze(String analyzeCommand) throws IOException, ParseException, SemanticException{ + private Operator genSelOpForAnalyze(String analyzeCommand, Context origCtx) throws IOException, ParseException, SemanticException{ //0. initialization Context ctx = new Context(conf); - ASTNode tree = ParseUtils.parse(analyzeCommand, ctx); + ctx.setExplainConfig(origCtx.getExplainConfig()); + ParseDriver pd = new ParseDriver(); + ASTNode tree = pd.parse(analyzeCommand, ctx); + tree = ParseUtils.findRootNonNullToken(tree); //1. get the ColumnStatsSemanticAnalyzer BaseSemanticAnalyzer baseSem = SemanticAnalyzerFactory.get(new QueryState(conf), tree); http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java index 242a880..93b8183 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java @@ -405,6 +405,7 @@ public class ColumnStatsSemanticAnalyzer extends SemanticAnalyzer { analyzeRewrite.setColType(colType); qbp.setAnalyzeRewrite(analyzeRewrite); initCtx(ctx); + ctx.setExplainConfig(origCtx.getExplainConfig()); LOG.info("Invoking analyze on rewritten query"); analyzeInternal(rewrittenTree); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java new file mode 100644 index 0000000..4a8ff15 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +/** + * ExplainConfiguration + */ + +public class ExplainConfiguration { + private boolean extended = false; + private boolean formatted = false; + private boolean dependency = false; + private boolean logical = false; + private boolean authorize = false; + private boolean userLevelExplain = false; + private Path explainRootPath; + private Map<String, Long> opIdToRuntimeNumRows; + + public enum AnalyzeState { + RUNNING, ANALYZING + }; + + private AnalyzeState analyze = null; + + public boolean isExtended() { + return extended; + } + + public void setExtended(boolean extended) { + this.extended = extended; + } + + public boolean isFormatted() { + return formatted; + } + + public void setFormatted(boolean formatted) { + this.formatted = formatted; + } + + public boolean isDependency() { + return dependency; + } + + public void setDependency(boolean dependency) { + this.dependency = dependency; + } + + public boolean isLogical() { + return logical; + } + + public void setLogical(boolean logical) { + this.logical = logical; + } + + public boolean isAuthorize() { + return authorize; + } + + public void setAuthorize(boolean authorize) { + this.authorize = authorize; + } + + public AnalyzeState getAnalyze() { + return analyze; + } + + public void setAnalyze(AnalyzeState analyze) { + this.analyze = analyze; + } + + public boolean isUserLevelExplain() { + return userLevelExplain; + } + + public void setUserLevelExplain(boolean userLevelExplain) { + this.userLevelExplain = userLevelExplain; + } + + public Path getExplainRootPath() { + return explainRootPath; + } + + public void setExplainRootPath(Path explainRootPath) { + this.explainRootPath = explainRootPath; + } + + public Map<String, Long> getOpIdToRuntimeNumRows() { + return opIdToRuntimeNumRows; + } + + public void setOpIdToRuntimeNumRows(Map<String, Long> opIdToRuntimeNumRows) { + this.opIdToRuntimeNumRows = opIdToRuntimeNumRows; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java index 8d7fd92..7e24364 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java @@ -36,8 +36,7 @@ public class ExplainSQRewriteSemanticAnalyzer extends BaseSemanticAnalyzer { @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - - ctx.setExplain(true); + ctx.setExplainConfig(new ExplainConfiguration()); // Create a semantic analyzer for the query ASTNode input = (ASTNode) ast.getChild(0); http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 75753b0..e0a1d3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -18,18 +18,40 @@ package org.apache.hadoop.hive.ql.parse; +import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; - +import java.util.Map; + +import org.antlr.runtime.TokenRewriteStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.processors.CommandProcessor; +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.stats.StatsAggregator; +import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; +import org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator; /** * ExplainSemanticAnalyzer. @@ -37,40 +59,67 @@ import org.apache.hadoop.hive.ql.plan.ExplainWork; */ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { List<FieldSchema> fieldList; + ExplainConfiguration config; public ExplainSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); + config = new ExplainConfiguration(); } @SuppressWarnings("unchecked") @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - - boolean extended = false; - boolean formatted = false; - boolean dependency = false; - boolean logical = false; - boolean authorize = false; for (int i = 1; i < ast.getChildCount(); i++) { int explainOptions = ast.getChild(i).getType(); if (explainOptions == HiveParser.KW_FORMATTED) { - formatted = true; + config.setFormatted(true); } else if (explainOptions == HiveParser.KW_EXTENDED) { - extended = true; + config.setExtended(true); } else if (explainOptions == HiveParser.KW_DEPENDENCY) { - dependency = true; + config.setDependency(true); } else if (explainOptions == HiveParser.KW_LOGICAL) { - logical = true; + config.setLogical(true); } else if (explainOptions == HiveParser.KW_AUTHORIZATION) { - authorize = true; + config.setAuthorize(true); + } else if (explainOptions == HiveParser.KW_ANALYZE) { + config.setAnalyze(AnalyzeState.RUNNING); + config.setExplainRootPath(ctx.getMRTmpPath()); } } - ctx.setExplain(true); - ctx.setExplainLogical(logical); + ctx.setExplainConfig(config); - // Create a semantic analyzer for the query ASTNode input = (ASTNode) ast.getChild(0); + // explain analyze is composed of two steps + // step 1 (ANALYZE_STATE.RUNNING), run the query and collect the runtime #rows + // step 2 (ANALYZE_STATE.ANALYZING), explain the query and provide the runtime #rows collected. + if (config.getAnalyze() == AnalyzeState.RUNNING) { + String query = ctx.getTokenRewriteStream().toString(input.getTokenStartIndex(), + input.getTokenStopIndex()); + LOG.info("Explain analyze (running phase) for query " + query); + Context runCtx = null; + try { + runCtx = new Context(conf); + // runCtx and ctx share the configuration + runCtx.setExplainConfig(config); + Driver driver = new Driver(conf, runCtx); + driver.run(query); + // Note that we need to call getResults for simple fetch optimization. + // However, we need to skip all the results. + while (driver.getResults(new ArrayList<String>())) { + } + config.setOpIdToRuntimeNumRows(aggregateStats(config.getExplainRootPath())); + } catch (IOException e1) { + throw new SemanticException(e1); + } catch (CommandNeedRetryException e) { + throw new SemanticException(e); + } + ctx.resetOpContext(); + ctx.resetStream(); + TaskFactory.resetId(); + LOG.info("Explain analyze (analyzing phase) for query " + query); + config.setAnalyze(AnalyzeState.ANALYZING); + } BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input); sem.analyze(input, ctx); sem.validate(); @@ -92,24 +141,20 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { pCtx = ((SemanticAnalyzer)sem).getParseContext(); } - boolean userLevelExplain = !extended - && !formatted - && !dependency - && !logical - && !authorize + config.setUserLevelExplain(!config.isExtended() + && !config.isFormatted() + && !config.isDependency() + && !config.isLogical() + && !config.isAuthorize() && (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_EXPLAIN_USER) && HiveConf - .getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")); + .getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"))); + ExplainWork work = new ExplainWork(ctx.getResFile(), pCtx, tasks, fetchTask, sem, - extended, - formatted, - dependency, - logical, - authorize, - userLevelExplain, + config, ctx.getCboInfo()); work.setAppendTaskType( @@ -121,6 +166,43 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { rootTasks.add(explTask); } + private Map<String, Long> aggregateStats(Path localTmpPath) { + Map<String, Long> opIdToRuntimeNumRows = new HashMap<String, Long>(); + // localTmpPath is the root of all the stats. + // Under it, there will be SEL_1/statsfiles, SEL_2/statsfiles etc where SEL_1 and SEL_2 are the op ids. + FileSystem fs; + FileStatus[] statuses = null; + try { + fs = localTmpPath.getFileSystem(conf); + statuses = fs.listStatus(localTmpPath, FileUtils.HIDDEN_FILES_PATH_FILTER); + // statuses can be null if it is DDL, etc + } catch (IOException e) { + LOG.warn(e.toString()); + } + if (statuses != null) { + for (FileStatus status : statuses) { + if (status.isDir()) { + StatsCollectionContext scc = new StatsCollectionContext(conf); + String[] names = status.getPath().toString().split(Path.SEPARATOR); + String opId = names[names.length - 1]; + scc.setStatsTmpDir(status.getPath().toString()); + StatsAggregator statsAggregator = new FSStatsAggregator(); + if (!statsAggregator.connect(scc)) { + // -1 means that there is no stats + opIdToRuntimeNumRows.put(opId, -1L); + } else { + String value = statsAggregator.aggregateStats("", StatsSetupConst.RUN_TIME_ROW_COUNT); + opIdToRuntimeNumRows.put(opId, Long.parseLong(value)); + } + if (statsAggregator != null) { + statsAggregator.closeConnection(scc); + } + } + } + } + return opIdToRuntimeNumRows; + } + @Override public List<FieldSchema> getResultSchema() { return fieldList; @@ -133,4 +215,5 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { Task task = rootTasks.get(0); return task instanceof ExplainTask && ((ExplainTask)task).getWork().isAuthorize(); } + } http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index bb9b53d..c88d537 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -194,7 +194,7 @@ public class GenTezUtils { } // removes any union operator and clones the plan - public static void removeUnionOperators(GenTezProcContext context, BaseWork work) + public static void removeUnionOperators(GenTezProcContext context, BaseWork work, int indexForTezUnion) throws SemanticException { List<Operator<?>> roots = new ArrayList<Operator<?>>(); @@ -205,7 +205,7 @@ public class GenTezUtils { roots.addAll(context.eventOperatorSet); // need to clone the plan. - List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(roots); + List<Operator<?>> newRoots = SerializationUtilities.cloneOperatorTree(roots, indexForTezUnion); // we're cloning the operator plan but we're retaining the original work. That means // that root operators have to be replaced with the cloned ops. The replacement map @@ -294,8 +294,7 @@ public class GenTezUtils { linked = context.linkedFileSinks.get(path); linked.add(desc); - desc.setIndexInTezUnion(linked.size()); - desc.setDirName(new Path(path, "" + desc.getIndexInTezUnion())); + desc.setDirName(new Path(path, "" + linked.size())); desc.setLinkedFileSink(true); desc.setParentDir(path); desc.setLinkedFileSinkDesc(linked); http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index c85efec..00b872d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -735,7 +735,7 @@ explainStatement explainOption @init { msgs.push("explain option"); } @after { msgs.pop(); } - : KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION + : KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION|KW_ANALYZE ; execStatement http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java index 5b08ed2..d7a56e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java @@ -178,7 +178,7 @@ public class MapReduceCompiler extends TaskCompiler { throws SemanticException { // bypass for explain queries for now - if (ctx.getExplain()) { + if (ctx.isExplainSkipExecution()) { return; } http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 252a916..6a66691 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -138,6 +138,7 @@ import org.apache.hadoop.hive.ql.optimizer.lineage.Generator; import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType; import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec; import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFInputSpec; @@ -7246,7 +7247,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { LOG.info("Generate an operator pipleline to autogather column stats for table " + tableName + " in query " + ctx.getCmd()); ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null; - columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto); + columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto, ctx); columnStatsAutoGatherContext.insertAnalyzePipeline(); columnStatsAutoGatherContexts.add(columnStatsAutoGatherContext); } @@ -11076,10 +11077,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // 5. Take care of view creation if (createVwDesc != null) { - if (!ctx.isCboSucceeded()) { - saveViewDefinition(); + if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) { + return; } + if (!ctx.isCboSucceeded()) { + saveViewDefinition(); + } + // validate the create view statement at this point, the createVwDesc gets // all the information for semanticcheck validateCreateView(createVwDesc); @@ -11168,7 +11173,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // 11. if desired check we're not going over partition scan limits - if (!ctx.getExplain()) { + if (!ctx.isExplainSkipExecution()) { enforceScanLimits(pCtx, origFetchTask); } @@ -11964,7 +11969,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case CTAS: // create table as select if (isTemporary) { - if (!ctx.getExplain() && !isMaterialization) { + if (!ctx.isExplainSkipExecution() && !isMaterialization) { String dbName = qualifiedTabName[0]; String tblName = qualifiedTabName[1]; SessionState ss = SessionState.get(); @@ -11983,7 +11988,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // dumpTable is only used to check the conflict for non-temporary tables try { Table dumpTable = db.newTable(dbDotTab); - if (null != db.getTable(dumpTable.getDbName(), dumpTable.getTableName(), false) && !ctx.getExplain()) { + if (null != db.getTable(dumpTable.getDbName(), dumpTable.getTableName(), false) && !ctx.isExplainSkipExecution()) { throw new SemanticException(ErrorMsg.TABLE_ALREADY_EXISTS.getMsg(dbDotTab)); } } catch (HiveException e) {