http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 1688524..3cf8b71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -35,7 +35,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -46,6 +48,9 @@ import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -77,6 +82,9 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   protected final AtomicBoolean abortOp;
   private transient ExecMapperContext execContext;
   private transient boolean rootInitializeCalled = false;
+  protected transient long runTimeNumRows;
+  protected int indexForTezUnion = -1;
+  private transient Configuration hconf;
   protected final transient Collection<Future<?>> asyncInitOperations = new 
HashSet<>();
 
   // It can be optimized later so that an operator operator (init/close) is 
performed
@@ -476,7 +484,9 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
    * Operator specific initialization.
    */
   protected void initializeOp(Configuration hconf) throws HiveException {
+    this.hconf = hconf;
     rootInitializeCalled = true;
+    runTimeNumRows = 0;
   }
 
   /**
@@ -715,6 +725,10 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
    * should overwrite this funtion for their specific cleanup routine.
    */
   protected void closeOp(boolean abort) throws HiveException {
+    if (conf != null && conf.getRuntimeStatsTmpDir() != null) {
+      publishRunTimeStats();
+    }
+    runTimeNumRows = 0;
   }
 
   private boolean jobCloseDone = false;
@@ -869,7 +883,7 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
 
   protected void forward(Object row, ObjectInspector rowInspector)
       throws HiveException {
-
+    runTimeNumRows++;
     if (getDone()) {
       return;
     }
@@ -1425,4 +1439,38 @@ public abstract class Operator<T extends OperatorDesc> 
implements Serializable,C
   public CompilationOpContext getCompilationOpContext() {
     return cContext;
   }
+
+  private void publishRunTimeStats() throws HiveException {
+    StatsPublisher statsPublisher = new FSStatsPublisher();
+    StatsCollectionContext sContext = new StatsCollectionContext(hconf);
+    sContext.setIndexForTezUnion(indexForTezUnion);
+    sContext.setStatsTmpDir(conf.getRuntimeStatsTmpDir());
+
+    if (!statsPublisher.connect(sContext)) {
+      LOG.error("StatsPublishing error: cannot connect to database");
+      throw new 
HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
+    }
+
+    String prefix = "";
+    Map<String, String> statsToPublish = new HashMap<String, String>();
+    statsToPublish.put(StatsSetupConst.RUN_TIME_ROW_COUNT, 
Long.toString(runTimeNumRows));
+    if (!statsPublisher.publishStat(prefix, statsToPublish)) {
+      // The original exception is lost.
+      // Not changing the interface to maintain backward compatibility
+      throw new 
HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg());
+    }
+    if (!statsPublisher.closeConnection(sContext)) {
+      // The original exception is lost.
+      // Not changing the interface to maintain backward compatibility
+      throw new 
HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
+    }
+  }
+
+  public int getIndexForTezUnion() {
+    return indexForTezUnion;
+  }
+
+  public void setIndexForTezUnion(int indexForTezUnion) {
+    this.indexForTezUnion = indexForTezUnion;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index fa23b01..c4490f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -548,6 +548,7 @@ public class ReduceSinkOperator extends 
TerminalOperator<ReduceSinkDesc>
     // forward is not called
     if (null != out) {
       numRows++;
+      runTimeNumRows++;
       if (isLogInfoEnabled) {
         if (numRows == cntr) {
           cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows;

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 277683e..247d589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -546,6 +546,28 @@ public class SerializationUtilities {
     return result;
   }
 
+  public static List<Operator<?>> cloneOperatorTree(List<Operator<?>> roots, 
int indexForTezUnion) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    CompilationOpContext ctx = roots.isEmpty() ? null : 
roots.get(0).getCompilationOpContext();
+    serializePlan(roots, baos, true);
+    @SuppressWarnings("unchecked")
+    List<Operator<?>> result =
+        deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+            roots.getClass(), true);
+    // Restore the context.
+    LinkedList<Operator<?>> newOps = new LinkedList<>(result);
+    while (!newOps.isEmpty()) {
+      Operator<?> newOp = newOps.poll();
+      newOp.setIndexForTezUnion(indexForTezUnion);
+      newOp.setCompilationOpContext(ctx);
+      List<Operator<?>> children = newOp.getChildOperators();
+      if (children != null) {
+        newOps.addAll(children);
+      }
+    }
+    return result;
+  }
+
   /**
    * Clones using the powers of XML. Do not use unless necessary.
    * @param plan The plan.

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 602ed1a..483fc3a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -87,7 +88,9 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
 
   @Override
   public int execute(DriverContext driverContext) {
-
+    if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) {
+      return 0;
+    }
     LOG.info("Executing stats task");
     // Make sure that it is either an ANALYZE, INSERT OVERWRITE (maybe load) 
or CTAS command
     short workComponentsPresent = 0;
@@ -147,7 +150,7 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
       if (!getWork().getNoStatsAggregator() && 
!getWork().isNoScanAnalyzeCommand()) {
         try {
           scc = getContext();
-          statsAggregator = createStatsAggregator(scc);
+          statsAggregator = createStatsAggregator(scc, conf);
         } catch (HiveException e) {
           if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
             throw e;
@@ -294,7 +297,7 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
     return prefix;
   }
 
-  private StatsAggregator createStatsAggregator(StatsCollectionContext scc) 
throws HiveException {
+  private StatsAggregator createStatsAggregator(StatsCollectionContext scc, 
HiveConf conf) throws HiveException {
     String statsImpl = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVESTATSDBCLASS);
     StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
     if (factory == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index 6afe957..0f02222 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -267,6 +267,7 @@ public class TableScanOperator extends 
Operator<TableScanDesc> implements
         publishStats();
       }
     }
+    super.closeOp(abort);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
index a75b52a..0d1fa31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
@@ -150,5 +150,6 @@ public class UDTFOperator extends Operator<UDTFDesc> 
implements Serializable {
   @Override
   protected void closeOp(boolean abort) throws HiveException {
     conf.getGenericUDTF().close();
+    super.closeOp(abort);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java 
b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
index 5b8afb5..2dd03f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
@@ -137,98 +138,111 @@ public class ATSHook implements ExecuteWithHookContext {
 
   @Override
   public void run(final HookContext hookContext) throws Exception {
-    final long currentTime = System.currentTimeMillis();
-    final HiveConf conf = new HiveConf(hookContext.getConf());
-    final QueryState queryState = hookContext.getQueryState();
-    final String queryId = queryState.getQueryId();
-
-    final Map<String, Long> durations = new HashMap<String, Long>();
-    for (String key : hookContext.getPerfLogger().getEndTimes().keySet()) {
-      durations.put(key, hookContext.getPerfLogger().getDuration(key));
-    }
-
-    try {
-      setupAtsExecutor(conf);
+          final long currentTime = System.currentTimeMillis();
+          final HiveConf conf = new HiveConf(hookContext.getConf());
+          final QueryState queryState = hookContext.getQueryState();
+          final String queryId = queryState.getQueryId();
+
+          final Map<String, Long> durations = new HashMap<String, Long>();
+          for (String key : 
hookContext.getPerfLogger().getEndTimes().keySet()) {
+                  durations.put(key, 
hookContext.getPerfLogger().getDuration(key));
+          }
 
-      executor.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              QueryPlan plan = hookContext.getQueryPlan();
-              if (plan == null) {
-                return;
-              }
-              String queryId = plan.getQueryId();
-              String opId = hookContext.getOperationId();
-              long queryStartTime = plan.getQueryStartTime();
-              String user = hookContext.getUgi().getShortUserName();
-              String requestuser = hookContext.getUserName();
-              if (hookContext.getUserName() == null ){
-                requestuser = hookContext.getUgi().getUserName() ;
-              }
-              int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
-              int numTezJobs = 
Utilities.getTezTasks(plan.getRootTasks()).size();
-              if (numMrJobs + numTezJobs <= 0) {
-                return; // ignore client only queries
-              }
-
-              switch(hookContext.getHookType()) {
-              case PRE_EXEC_HOOK:
-                ExplainWork work = new ExplainWork(null,// resFile
-                    null,// pCtx
-                    plan.getRootTasks(),// RootTasks
-                    plan.getFetchTask(),// FetchTask
-                    null,// analyzer
-                    false,// extended
-                    true,// formatted
-                    false,// dependency
-                    false,// logical
-                    false,// authorize
-                    false,// userLevelExplain
-                    null// cboInfo
-                );
-                @SuppressWarnings("unchecked")
-                ExplainTask explain = (ExplainTask) TaskFactory.get(work, 
conf);
-                explain.initialize(queryState, plan, null, null);
-                String query = plan.getQueryStr();
-                JSONObject explainPlan = explain.getJSONPlan(null, work);
-                String logID = conf.getLogIdVar(hookContext.getSessionId());
-                List<String> tablesRead = 
getTablesFromEntitySet(hookContext.getInputs());
-                List<String> tablesWritten = 
getTablesFromEntitySet(hookContext.getOutputs());
-                String executionMode = getExecutionMode(plan).name();
-                String hiveInstanceAddress = 
hookContext.getHiveInstanceAddress();
-                if (hiveInstanceAddress == null) {
-                  hiveInstanceAddress = 
InetAddress.getLocalHost().getHostAddress();
-                }
-                String hiveInstanceType = hookContext.isHiveServerQuery() ? 
"HS2" : "CLI";
-                ApplicationId llapId = determineLlapId(conf, plan);
-                fireAndForget(
-                    createPreHookEvent(queryId, query, explainPlan, 
queryStartTime,
-                        user, requestuser, numMrJobs, numTezJobs, opId,
-                        hookContext.getIpAddress(), hiveInstanceAddress, 
hiveInstanceType,
-                        hookContext.getSessionId(), logID, 
hookContext.getThreadId(), executionMode,
-                        tablesRead, tablesWritten, conf));
-                break;
-              case POST_EXEC_HOOK:
-                fireAndForget(createPostHookEvent(queryId, currentTime, user, 
requestuser, true, opId, durations));
-                break;
-              case ON_FAILURE_HOOK:
-                fireAndForget(createPostHookEvent(queryId, currentTime, user, 
requestuser , false, opId, durations));
-                break;
-              default:
-                //ignore
-                break;
-              }
-            } catch (Exception e) {
-              LOG.warn("Failed to submit plan to ATS for " + queryId, e);
-            }
+          try {
+                  setupAtsExecutor(conf);
+
+                  executor.submit(new Runnable() {
+                          @Override
+                          public void run() {
+                                  try {
+                                          QueryPlan plan = 
hookContext.getQueryPlan();
+                                          if (plan == null) {
+                                                  return;
+                                          }
+                                          String queryId = plan.getQueryId();
+                                          String opId = 
hookContext.getOperationId();
+                                          long queryStartTime = 
plan.getQueryStartTime();
+                                          String user = 
hookContext.getUgi().getUserName();
+                                          String requestuser = 
hookContext.getUserName();
+                                          if (hookContext.getUserName() == 
null) {
+                                                  requestuser = 
hookContext.getUgi().getUserName();
+                                          }
+                                          int numMrJobs = Utilities.getMRTasks(
+                                                          
plan.getRootTasks()).size();
+                                          int numTezJobs = 
Utilities.getTezTasks(
+                                                          
plan.getRootTasks()).size();
+                                          if (numMrJobs + numTezJobs <= 0) {
+                                                  return; // ignore client 
only queries
+                                          }
+
+                                          switch (hookContext.getHookType()) {
+                                          case PRE_EXEC_HOOK:
+                                                  ExplainConfiguration config 
= new ExplainConfiguration();
+                                                  config.setFormatted(true);
+                                                  ExplainWork work = new 
ExplainWork(null,// resFile
+                                                                  null,// pCtx
+                                                                  
plan.getRootTasks(),// RootTasks
+                                                                  
plan.getFetchTask(),// FetchTask
+                                                                  null,// 
analyzer
+                                                                  config, // 
explainConfig
+                                                                  null// 
cboInfo
+                                                  );
+                                                  
@SuppressWarnings("unchecked")
+                                                  ExplainTask explain = 
(ExplainTask) TaskFactory
+                                                                  .get(work, 
conf);
+                                                  
explain.initialize(queryState, plan, null, null);
+                                                  String query = 
plan.getQueryStr();
+                                                  JSONObject explainPlan = 
explain.getJSONPlan(null,
+                                                                  work);
+                                                 String logID = 
conf.getLogIdVar(hookContext.getSessionId());
+                                                 List<String> tablesRead = 
getTablesFromEntitySet(hookContext
+                                                                  
.getInputs());
+                                                  List<String> tablesWritten = 
getTablesFromEntitySet(hookContext
+                                                                  
.getOutputs());
+                                                  String executionMode = 
getExecutionMode(plan)
+                                                                  .name();
+                                                  String hiveInstanceAddress = 
hookContext
+                                                                  
.getHiveInstanceAddress();
+                                                  if (hiveInstanceAddress == 
null) {
+                                                          hiveInstanceAddress 
= InetAddress
+                                                                          
.getLocalHost().getHostAddress();
+                                                  }
+                                                  String hiveInstanceType = 
hookContext
+                                                                  
.isHiveServerQuery() ? "HS2" : "CLI";
+                                                  ApplicationId llapId = 
determineLlapId(conf, plan);
+                                                  
fireAndForget(createPreHookEvent(queryId, query,
+                                                                  explainPlan, 
queryStartTime, user,
+                                                                  requestuser, 
numMrJobs, numTezJobs, opId,
+                                                                  
hookContext.getIpAddress(),
+                                                                  
hiveInstanceAddress, hiveInstanceType,
+                                                                  
hookContext.getSessionId(),
+                                                                  logID,
+                                                                  
hookContext.getThreadId(), executionMode,
+                                                                  tablesRead, 
tablesWritten, conf));
+                                                  break;
+                                          case POST_EXEC_HOOK:
+                                                  
fireAndForget(createPostHookEvent(queryId,
+                                                                  currentTime, 
user, requestuser, true, opId,
+                                                                  durations));
+                                                  break;
+                                          case ON_FAILURE_HOOK:
+                                                  
fireAndForget(createPostHookEvent(queryId,
+                                                                  currentTime, 
user, requestuser, false,
+                                                                  opId, 
durations));
+                                                  break;
+                                          default:
+                                                  // ignore
+                                                  break;
+                                          }
+                                  } catch (Exception e) {
+                                          LOG.warn("Failed to submit to ATS 
for " + queryId, e);
+                                  }
+                          }
+                  });
+          } catch (Exception e) {
+                  LOG.warn("Failed to submit to ATS for " + queryId, e);
           }
-        });
-    } catch (Exception e) {
-      LOG.warn("Failed to submit to ATS for " + queryId, e);
-    }
   }
-
   protected List<String> getTablesFromEntitySet(Set<? extends Entity> 
entities) {
     List<String> tableNames = new ArrayList<String>();
     for (Entity entity : entities) {

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
index 8c44933..c6287e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
@@ -219,7 +219,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, 
HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    if (pctx.getContext().getExplain() && !isTezExecEngine && 
!isSparkExecEngine) {
+    if (pctx.getContext().isExplainSkipExecution() && !isTezExecEngine && 
!isSparkExecEngine) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }
@@ -231,6 +231,7 @@ public class Optimizer {
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEFETCHTASKAGGR)) {
       transformations.add(new SimpleFetchAggregation());
     }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
new file mode 100644
index 0000000..ee67443
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AnnotateRunTimeStatsOptimizer.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.ql.stats.fs.FSStatsPublisher;
+
+public class AnnotateRunTimeStatsOptimizer implements PhysicalPlanResolver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AnnotateRunTimeStatsOptimizer.class);
+
+  private class AnnotateRunTimeStatsDispatcher implements Dispatcher {
+
+    private final PhysicalContext physicalContext;
+
+    public AnnotateRunTimeStatsDispatcher(PhysicalContext context, Map<Rule, 
NodeProcessor> rules) {
+      super();
+      physicalContext = context;
+    }
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      Task<? extends Serializable> currTask = (Task<? extends Serializable>) 
nd;
+      Set<Operator<? extends OperatorDesc>> ops = new HashSet<>();
+
+      if (currTask instanceof MapRedTask) {
+        MapRedTask mr = (MapRedTask) currTask;
+        ops.addAll(mr.getWork().getAllOperators());
+      } else if (currTask instanceof TezTask) {
+        TezWork work = ((TezTask) currTask).getWork();
+        for (BaseWork w : work.getAllWork()) {
+          ops.addAll(w.getAllOperators());
+        }
+      } else if (currTask instanceof SparkTask) {
+        SparkWork sparkWork = (SparkWork) currTask.getWork();
+        for (BaseWork w : sparkWork.getAllWork()) {
+          ops.addAll(w.getAllOperators());
+        }
+      }
+
+      setOrAnnotateStats(ops, physicalContext.getParseContext());
+      return null;
+    }
+
+  }
+
+  public static void setOrAnnotateStats(Set<Operator<? extends OperatorDesc>> 
ops, ParseContext pctx)
+      throws SemanticException {
+    for (Operator<? extends OperatorDesc> op : ops) {
+      if (pctx.getContext().getExplainAnalyze() == AnalyzeState.RUNNING) {
+        setRuntimeStatsDir(op, pctx);
+      } else if (pctx.getContext().getExplainAnalyze() == 
AnalyzeState.ANALYZING) {
+        annotateRuntimeStats(op, pctx);
+      } else {
+        throw new SemanticException("Unexpected stats in 
AnnotateWithRunTimeStatistics.");
+      }
+    }
+  }
+
+  private static void setRuntimeStatsDir(Operator<? extends OperatorDesc> op, 
ParseContext pctx)
+      throws SemanticException {
+    try {
+      OperatorDesc conf = op.getConf();
+      if (conf != null) {
+        LOG.info("setRuntimeStatsDir for " + op.getOperatorId());
+        String path = new 
Path(pctx.getContext().getExplainConfig().getExplainRootPath(),
+            op.getOperatorId()).toString();
+        StatsPublisher statsPublisher = new FSStatsPublisher();
+        StatsCollectionContext runtimeStatsContext = new 
StatsCollectionContext(pctx.getConf());
+        runtimeStatsContext.setStatsTmpDir(path);
+        if (!statsPublisher.init(runtimeStatsContext)) {
+          LOG.error("StatsPublishing error: StatsPublisher is not 
initialized.");
+          throw new 
HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
+        }
+        conf.setRuntimeStatsTmpDir(path);
+      } else {
+        LOG.debug("skip setRuntimeStatsDir for " + op.getOperatorId()
+            + " because OperatorDesc is null");
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private static void annotateRuntimeStats(Operator<? extends OperatorDesc> 
op, ParseContext pctx) {
+    Long runTimeNumRows = 
pctx.getContext().getExplainConfig().getOpIdToRuntimeNumRows()
+        .get(op.getOperatorId());
+    if (op.getConf() != null && op.getConf().getStatistics() != null && 
runTimeNumRows != null) {
+      LOG.info("annotateRuntimeStats for " + op.getOperatorId());
+      op.getConf().getStatistics().setRunTimeNumRows(runTimeNumRows);
+    } else {
+      LOG.debug("skip annotateRuntimeStats for " + op.getOperatorId());
+    }
+  }
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws 
SemanticException {
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, 
NodeProcessor>();
+    Dispatcher disp = new AnnotateRunTimeStatsDispatcher(pctx, opRules);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getRootTasks());
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  public void resolve(Set<Operator<?>> opSet, ParseContext pctx) throws 
SemanticException {
+    Set<Operator<?>> ops = getAllOperatorsForSimpleFetch(opSet);
+    setOrAnnotateStats(ops, pctx);
+  }
+
+  private Set<Operator<?>> getAllOperatorsForSimpleFetch(Set<Operator<?>> 
opSet) {
+    Set<Operator<?>> returnSet = new LinkedHashSet<Operator<?>>();
+    Stack<Operator<?>> opStack = new Stack<Operator<?>>();
+    // add all children
+    opStack.addAll(opSet);
+    while (!opStack.empty()) {
+      Operator<?> op = opStack.pop();
+      returnSet.add(op);
+      if (op.getChildOperators() != null) {
+        opStack.addAll(op.getChildOperators());
+      }
+    }
+    return returnSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
index 49706b1..9377563 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
@@ -88,12 +88,17 @@ public class PhysicalOptimizer {
     // Vectorization should be the last optimization, because it doesn't 
modify the plan
     // or any operators. It makes a very low level transformation to the 
expressions to
     // run in the vectorized mode.
-    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)
+        && pctx.getContext().getExplainAnalyze() == null) {
       resolvers.add(new Vectorizer());
     }
     if 
(!"none".equalsIgnoreCase(hiveConf.getVar(HiveConf.ConfVars.HIVESTAGEIDREARRANGE)))
 {
       resolvers.add(new StageIDsRearranger());
     }
+
+    if (pctx.getContext().getExplainAnalyze() != null) {
+      resolvers.add(new AnnotateRunTimeStatsOptimizer());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
index c3e8bf5..80e62c1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsAutoGatherContext.java
@@ -67,10 +67,11 @@ public class ColumnStatsAutoGatherContext {
   private boolean isInsertInto;
   private Table tbl;
   private Map<String, String> partSpec;
+  private Context origCtx;
   
   public ColumnStatsAutoGatherContext(SemanticAnalyzer sa, HiveConf conf,
       Operator<? extends OperatorDesc> op, Table tbl, Map<String, String> 
partSpec,
-      boolean isInsertInto) throws SemanticException {
+      boolean isInsertInto, Context ctx) throws SemanticException {
     super();
     this.sa = sa;
     this.conf = conf;
@@ -78,6 +79,7 @@ public class ColumnStatsAutoGatherContext {
     this.tbl = tbl;
     this.partSpec = partSpec;
     this.isInsertInto = isInsertInto;
+    this.origCtx = ctx;
     columns = tbl.getCols();
     partitionColumns = tbl.getPartCols();
   }
@@ -107,7 +109,7 @@ public class ColumnStatsAutoGatherContext {
     // 2. Based on the statement, generate the selectOperator
     Operator<?> selOp = null;
     try {
-      selOp = genSelOpForAnalyze(analyzeCommand);
+      selOp = genSelOpForAnalyze(analyzeCommand, origCtx);
     } catch (IOException | ParseException e) {
       throw new SemanticException(e);
     }
@@ -126,10 +128,13 @@ public class ColumnStatsAutoGatherContext {
   }
   
   @SuppressWarnings("rawtypes")
-  private Operator genSelOpForAnalyze(String analyzeCommand) throws 
IOException, ParseException, SemanticException{
+  private Operator genSelOpForAnalyze(String analyzeCommand, Context origCtx) 
throws IOException, ParseException, SemanticException{
     //0. initialization
     Context ctx = new Context(conf);
-    ASTNode tree = ParseUtils.parse(analyzeCommand, ctx);
+    ctx.setExplainConfig(origCtx.getExplainConfig());
+    ParseDriver pd = new ParseDriver();
+    ASTNode tree = pd.parse(analyzeCommand, ctx);
+    tree = ParseUtils.findRootNonNullToken(tree);
 
     //1. get the ColumnStatsSemanticAnalyzer
     BaseSemanticAnalyzer baseSem = SemanticAnalyzerFactory.get(new 
QueryState(conf), tree);

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
index 242a880..93b8183 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
@@ -405,6 +405,7 @@ public class ColumnStatsSemanticAnalyzer extends 
SemanticAnalyzer {
       analyzeRewrite.setColType(colType);
       qbp.setAnalyzeRewrite(analyzeRewrite);
       initCtx(ctx);
+      ctx.setExplainConfig(origCtx.getExplainConfig());
       LOG.info("Invoking analyze on rewritten query");
       analyzeInternal(rewrittenTree);
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java
new file mode 100644
index 0000000..4a8ff15
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainConfiguration.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * ExplainConfiguration
+ */
+
+public class ExplainConfiguration {
+  private boolean extended = false;
+  private boolean formatted = false;
+  private boolean dependency = false;
+  private boolean logical = false;
+  private boolean authorize = false;
+  private boolean userLevelExplain = false;
+  private Path explainRootPath;
+  private Map<String, Long> opIdToRuntimeNumRows;
+
+  public enum AnalyzeState {
+    RUNNING, ANALYZING
+  };
+
+  private AnalyzeState analyze = null;
+
+  public boolean isExtended() {
+    return extended;
+  }
+
+  public void setExtended(boolean extended) {
+    this.extended = extended;
+  }
+
+  public boolean isFormatted() {
+    return formatted;
+  }
+
+  public void setFormatted(boolean formatted) {
+    this.formatted = formatted;
+  }
+
+  public boolean isDependency() {
+    return dependency;
+  }
+
+  public void setDependency(boolean dependency) {
+    this.dependency = dependency;
+  }
+
+  public boolean isLogical() {
+    return logical;
+  }
+
+  public void setLogical(boolean logical) {
+    this.logical = logical;
+  }
+
+  public boolean isAuthorize() {
+    return authorize;
+  }
+
+  public void setAuthorize(boolean authorize) {
+    this.authorize = authorize;
+  }
+
+  public AnalyzeState getAnalyze() {
+    return analyze;
+  }
+
+  public void setAnalyze(AnalyzeState analyze) {
+    this.analyze = analyze;
+  }
+
+  public boolean isUserLevelExplain() {
+    return userLevelExplain;
+  }
+
+  public void setUserLevelExplain(boolean userLevelExplain) {
+    this.userLevelExplain = userLevelExplain;
+  }
+
+  public Path getExplainRootPath() {
+    return explainRootPath;
+  }
+
+  public void setExplainRootPath(Path explainRootPath) {
+    this.explainRootPath = explainRootPath;
+  }
+
+  public Map<String, Long> getOpIdToRuntimeNumRows() {
+    return opIdToRuntimeNumRows;
+  }
+
+  public void setOpIdToRuntimeNumRows(Map<String, Long> opIdToRuntimeNumRows) {
+    this.opIdToRuntimeNumRows = opIdToRuntimeNumRows;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
index 8d7fd92..7e24364 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
@@ -36,8 +36,7 @@ public class ExplainSQRewriteSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   @Override
   public void analyzeInternal(ASTNode ast) throws SemanticException {
 
-
-    ctx.setExplain(true);
+    ctx.setExplainConfig(new ExplainConfiguration());
 
     // Create a semantic analyzer for the query
     ASTNode input = (ASTNode) ast.getChild(0);

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index 75753b0..e0a1d3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -18,18 +18,40 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
+
+import org.antlr.runtime.TokenRewriteStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
+import org.apache.hadoop.hive.ql.stats.fs.FSStatsAggregator;
 
 /**
  * ExplainSemanticAnalyzer.
@@ -37,40 +59,67 @@ import org.apache.hadoop.hive.ql.plan.ExplainWork;
  */
 public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
   List<FieldSchema> fieldList;
+  ExplainConfiguration config;
 
   public ExplainSemanticAnalyzer(QueryState queryState) throws 
SemanticException {
     super(queryState);
+    config = new ExplainConfiguration();
   }
 
   @SuppressWarnings("unchecked")
   @Override
   public void analyzeInternal(ASTNode ast) throws SemanticException {
-
-    boolean extended = false;
-    boolean formatted = false;
-    boolean dependency = false;
-    boolean logical = false;
-    boolean authorize = false;
     for (int i = 1; i < ast.getChildCount(); i++) {
       int explainOptions = ast.getChild(i).getType();
       if (explainOptions == HiveParser.KW_FORMATTED) {
-        formatted = true;
+        config.setFormatted(true);
       } else if (explainOptions == HiveParser.KW_EXTENDED) {
-        extended = true;
+        config.setExtended(true);
       } else if (explainOptions == HiveParser.KW_DEPENDENCY) {
-        dependency = true;
+        config.setDependency(true);
       } else if (explainOptions == HiveParser.KW_LOGICAL) {
-        logical = true;
+        config.setLogical(true);
       } else if (explainOptions == HiveParser.KW_AUTHORIZATION) {
-        authorize = true;
+        config.setAuthorize(true);
+      } else if (explainOptions == HiveParser.KW_ANALYZE) {
+        config.setAnalyze(AnalyzeState.RUNNING);
+        config.setExplainRootPath(ctx.getMRTmpPath());
       }
     }
 
-    ctx.setExplain(true);
-    ctx.setExplainLogical(logical);
+    ctx.setExplainConfig(config);
 
-    // Create a semantic analyzer for the query
     ASTNode input = (ASTNode) ast.getChild(0);
+    // explain analyze is composed of two steps
+    // step 1 (ANALYZE_STATE.RUNNING), run the query and collect the runtime 
#rows
+    // step 2 (ANALYZE_STATE.ANALYZING), explain the query and provide the 
runtime #rows collected.
+    if (config.getAnalyze() == AnalyzeState.RUNNING) {
+      String query = 
ctx.getTokenRewriteStream().toString(input.getTokenStartIndex(),
+          input.getTokenStopIndex());
+      LOG.info("Explain analyze (running phase) for query " + query);
+      Context runCtx = null;
+      try {
+        runCtx = new Context(conf);
+        // runCtx and ctx share the configuration
+        runCtx.setExplainConfig(config);
+        Driver driver = new Driver(conf, runCtx);
+        driver.run(query);
+          // Note that we need to call getResults for simple fetch 
optimization.
+          // However, we need to skip all the results.
+          while (driver.getResults(new ArrayList<String>())) {
+          }
+        
config.setOpIdToRuntimeNumRows(aggregateStats(config.getExplainRootPath()));
+      } catch (IOException e1) {
+        throw new SemanticException(e1);
+      } catch (CommandNeedRetryException e) {
+        throw new SemanticException(e);
+      }
+      ctx.resetOpContext();
+      ctx.resetStream();
+      TaskFactory.resetId();
+      LOG.info("Explain analyze (analyzing phase) for query " + query);
+      config.setAnalyze(AnalyzeState.ANALYZING);
+    }
     BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input);
     sem.analyze(input, ctx);
     sem.validate();
@@ -92,24 +141,20 @@ public class ExplainSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       pCtx = ((SemanticAnalyzer)sem).getParseContext();
     }
 
-    boolean userLevelExplain = !extended
-        && !formatted
-        && !dependency
-        && !logical
-        && !authorize
+    config.setUserLevelExplain(!config.isExtended()
+        && !config.isFormatted()
+        && !config.isDependency()
+        && !config.isLogical()
+        && !config.isAuthorize()
         && (HiveConf.getBoolVar(ctx.getConf(), 
HiveConf.ConfVars.HIVE_EXPLAIN_USER) && HiveConf
-            .getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"));
+            .getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")));
+
     ExplainWork work = new ExplainWork(ctx.getResFile(),
         pCtx,
         tasks,
         fetchTask,
         sem,
-        extended,
-        formatted,
-        dependency,
-        logical,
-        authorize,
-        userLevelExplain,
+        config,
         ctx.getCboInfo());
 
     work.setAppendTaskType(
@@ -121,6 +166,43 @@ public class ExplainSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     rootTasks.add(explTask);
   }
 
+  private Map<String, Long> aggregateStats(Path localTmpPath) {
+    Map<String, Long> opIdToRuntimeNumRows = new HashMap<String, Long>();
+    // localTmpPath is the root of all the stats.
+    // Under it, there will be SEL_1/statsfiles, SEL_2/statsfiles etc where 
SEL_1 and SEL_2 are the op ids.
+    FileSystem fs;
+    FileStatus[] statuses = null;
+    try {
+      fs = localTmpPath.getFileSystem(conf);
+      statuses = fs.listStatus(localTmpPath, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+      // statuses can be null if it is DDL, etc
+    } catch (IOException e) {
+      LOG.warn(e.toString());
+    }
+    if (statuses != null) {
+      for (FileStatus status : statuses) {
+        if (status.isDir()) {
+          StatsCollectionContext scc = new StatsCollectionContext(conf);
+          String[] names = status.getPath().toString().split(Path.SEPARATOR);
+          String opId = names[names.length - 1];
+          scc.setStatsTmpDir(status.getPath().toString());
+          StatsAggregator statsAggregator = new FSStatsAggregator();
+          if (!statsAggregator.connect(scc)) {
+            // -1 means that there is no stats
+            opIdToRuntimeNumRows.put(opId, -1L);
+          } else {
+            String value = statsAggregator.aggregateStats("", 
StatsSetupConst.RUN_TIME_ROW_COUNT);
+            opIdToRuntimeNumRows.put(opId, Long.parseLong(value));
+          }
+          if (statsAggregator != null) {
+            statsAggregator.closeConnection(scc);
+          }
+        }
+      }
+    }
+    return opIdToRuntimeNumRows;
+  }
+
   @Override
   public List<FieldSchema> getResultSchema() {
     return fieldList;
@@ -133,4 +215,5 @@ public class ExplainSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     Task task = rootTasks.get(0);
     return task instanceof ExplainTask && 
((ExplainTask)task).getWork().isAuthorize();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index bb9b53d..c88d537 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -194,7 +194,7 @@ public class GenTezUtils {
   }
 
   // removes any union operator and clones the plan
-  public static void removeUnionOperators(GenTezProcContext context, BaseWork 
work)
+  public static void removeUnionOperators(GenTezProcContext context, BaseWork 
work, int indexForTezUnion)
     throws SemanticException {
 
     List<Operator<?>> roots = new ArrayList<Operator<?>>();
@@ -205,7 +205,7 @@ public class GenTezUtils {
     roots.addAll(context.eventOperatorSet);
 
     // need to clone the plan.
-    List<Operator<?>> newRoots = 
SerializationUtilities.cloneOperatorTree(roots);
+    List<Operator<?>> newRoots = 
SerializationUtilities.cloneOperatorTree(roots, indexForTezUnion);
 
     // we're cloning the operator plan but we're retaining the original work. 
That means
     // that root operators have to be replaced with the cloned ops. The 
replacement map
@@ -294,8 +294,7 @@ public class GenTezUtils {
         linked = context.linkedFileSinks.get(path);
         linked.add(desc);
 
-        desc.setIndexInTezUnion(linked.size());
-        desc.setDirName(new Path(path, "" + desc.getIndexInTezUnion()));
+        desc.setDirName(new Path(path, "" + linked.size()));
         desc.setLinkedFileSink(true);
         desc.setParentDir(path);
         desc.setLinkedFileSinkDesc(linked);

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index c85efec..00b872d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -735,7 +735,7 @@ explainStatement
 explainOption
 @init { msgs.push("explain option"); }
 @after { msgs.pop(); }
-    : KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION
+    : 
KW_EXTENDED|KW_FORMATTED|KW_DEPENDENCY|KW_LOGICAL|KW_AUTHORIZATION|KW_ANALYZE
     ;
 
 execStatement

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
index 5b08ed2..d7a56e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
@@ -178,7 +178,7 @@ public class MapReduceCompiler extends TaskCompiler {
       throws SemanticException {
 
     // bypass for explain queries for now
-    if (ctx.getExplain()) {
+    if (ctx.isExplainSkipExecution()) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/831bd7d8/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 252a916..6a66691 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -138,6 +138,7 @@ import 
org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType;
 import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
 import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFInputSpec;
@@ -7246,7 +7247,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     LOG.info("Generate an operator pipleline to autogather column stats for 
table " + tableName
         + " in query " + ctx.getCmd());
     ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null;
-    columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, 
conf, curr, table, partSpec, isInsertInto);
+    columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, 
conf, curr, table, partSpec, isInsertInto, ctx);
     columnStatsAutoGatherContext.insertAnalyzePipeline();
     columnStatsAutoGatherContexts.add(columnStatsAutoGatherContext);
   }
@@ -11076,10 +11077,14 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     // 5. Take care of view creation
     if (createVwDesc != null) {
-      if (!ctx.isCboSucceeded()) {
-        saveViewDefinition();
+      if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) {
+        return;
       }
 
+      if (!ctx.isCboSucceeded()) {
+        saveViewDefinition();
+      }      
+      
       // validate the create view statement at this point, the createVwDesc 
gets
       // all the information for semanticcheck
       validateCreateView(createVwDesc);
@@ -11168,7 +11173,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
 
     // 11. if desired check we're not going over partition scan limits
-    if (!ctx.getExplain()) {
+    if (!ctx.isExplainSkipExecution()) {
       enforceScanLimits(pCtx, origFetchTask);
     }
 
@@ -11964,7 +11969,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     case CTAS: // create table as select
 
       if (isTemporary) {
-        if (!ctx.getExplain() && !isMaterialization) {
+        if (!ctx.isExplainSkipExecution() && !isMaterialization) {
           String dbName = qualifiedTabName[0];
           String tblName = qualifiedTabName[1];
           SessionState ss = SessionState.get();
@@ -11983,7 +11988,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         // dumpTable is only used to check the conflict for non-temporary 
tables
         try {
           Table dumpTable = db.newTable(dbDotTab);
-          if (null != db.getTable(dumpTable.getDbName(), 
dumpTable.getTableName(), false) && !ctx.getExplain()) {
+          if (null != db.getTable(dumpTable.getDbName(), 
dumpTable.getTableName(), false) && !ctx.isExplainSkipExecution()) {
             throw new 
SemanticException(ErrorMsg.TABLE_ALREADY_EXISTS.getMsg(dbDotTab));
           }
         } catch (HiveException e) {

Reply via email to