[SPARK-8861][SPARK-8862][SQL] Add basic instrumentation to each SparkPlan 
operator and add a new SQL tab

This PR includes the following changes:

### SPARK-8862: Add basic instrumentation to each SparkPlan operator

A SparkPlan can override `def accumulators: Map[String, Accumulator[_]]` to 
expose its metrics that can be displayed in UI. The UI will use them to track 
the updates and show them in the web page in real-time.

### SparkSQLExecution and SQLSparkListener

`SparkSQLExecution.withNewExecutionId` will set `spark.sql.execution.id` to the 
local properties so that we can use it to track all jobs that belong to the 
same query.

SQLSparkListener is a listener to track all accumulator updates of all tasks 
for a query. It receives them from heartbeats can the UI can query them in 
real-time.

When running a query, `SQLSparkListener.onExecutionStart` will be called. When 
a query is finished,  `SQLSparkListener.onExecutionEnd` will be called. And the 
Spark jobs with the same execution id will be tracked and stored with this 
query.

`SQLSparkListener` has to store all accumulator updates for tasks separately. 
When a task fails and starts to retry, we need to drop the old accumulator 
updates. Because we can not revert our changes to an accumulator, we have to 
maintain these accumulator updates by ourselves so as to drop accumulator 
updates for a failed task.

### SPARK-8862: A new SQL tab
Includes two pages:
#### A page for all DataFrame/SQL queries
It will show the running, completed and failed queries in 3 tables. It also 
displays the jobs and their links for a query in each row.
#### A detail page for a DataFrame/SQL query
In this page, it also shows the SparkPlan metrics in real-time. Run a 
long-running query, such as
```
val testData = sc.parallelize((1 to 1000000).map(i => (i, i.toString))).toDF()
testData.select($"_1").filter($"_1" < 1000).foreach(_ => Thread.sleep(60))
```
and you will see the metrics keep updating in real-time.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/7774)
<!-- Reviewable:end -->

Author: zsxwing <zsxw...@gmail.com>

Closes #7774 from zsxwing/sql-ui and squashes the following commits:

5a2bc99 [zsxwing] Remove UISeleniumSuite and its dependency
57d4cd2 [zsxwing] Use VisibleForTesting annotation
cc1c736 [zsxwing] Add SparkPlan.trackNumOfRowsEnabled to make subclasses easy 
to track the number of rows; fix the issue that the "save" action cannot 
collect metrics
3771ab0 [zsxwing] Register SQL metrics accmulators
3a101c0 [zsxwing] Change prepareCalled's type to AtomicBoolean for thread-safety
b8d5605 [zsxwing] Make prepare idempotent; call children's prepare in 
SparkPlan.prepare; change doPrepare to def
4ed11a1 [zsxwing] var -> val
332639c [zsxwing] Ignore UISeleniumSuite and SQLListenerSuite."no memory leak" 
because of SPARK-9580
bb52359 [zsxwing] Address other commens in SQLListener
c4d0f5d [zsxwing] Move newPredicate out of the iterator loop
957473c [zsxwing] Move STATIC_RESOURCE_DIR to object SQLTab
7ab4816 [zsxwing] Make SparkPlan accumulator API private[sql]
dae195e [zsxwing] Fix the code style and comments
3a66207 [zsxwing] Ignore irrelevant accumulators
b8484a1 [zsxwing] Merge branch 'master' into sql-ui
9406592 [zsxwing] Implement the SparkPlan viz
4ebce68 [zsxwing] Add SparkPlan.prepare to support BroadcastHashJoin to run 
background work in parallel
ca1811f [zsxwing] Merge branch 'master' into sql-ui
fef6fc6 [zsxwing] Fix a corner case
25f335c [zsxwing] Fix the code style
6eae828 [zsxwing] SQLSparkListener -> SQLListener; SparkSQLExecutionUIData -> 
SQLExecutionUIData; SparkSQLExecution -> SQLExecution
822af75 [zsxwing] Add SQLSparkListenerSuite and fix the issue about 
onExecutionEnd and onJobEnd
6be626f [zsxwing] Add UISeleniumSuite to test UI
d02a24d [zsxwing] Make ExecutionPage private
23abf73 [zsxwing] [SPARK-8862][SPARK-8862][SQL] Add basic instrumentation to 
each SparkPlan operator and add a new SQL tab

(cherry picked from commit 1b0317f64cfe99ff70580eeb99753cd0d31f849a)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ebc3aad2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ebc3aad2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ebc3aad2

Branch: refs/heads/branch-1.5
Commit: ebc3aad272b91cf58e2e1b4aa92b49b8a947a045
Parents: 6306019
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Aug 5 01:51:22 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Aug 5 01:52:03 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  15 +-
 .../scala/org/apache/spark/SparkContext.scala   |  15 +
 .../org/apache/spark/executor/Executor.scala    |   2 +
 .../spark/sql/ui/static/spark-sql-viz.css       |  37 ++
 .../apache/spark/sql/ui/static/spark-sql-viz.js | 160 +++++++++
 .../scala/org/apache/spark/sql/DataFrame.scala  |  29 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   6 +
 .../spark/sql/execution/ExistingRDD.scala       |   3 +
 .../spark/sql/execution/LocalTableScan.scala    |   2 +
 .../spark/sql/execution/SQLExecution.scala      | 105 ++++++
 .../apache/spark/sql/execution/SparkPlan.scala  |  62 +++-
 .../spark/sql/execution/basicOperators.scala    |  18 +-
 .../sql/execution/datasources/commands.scala    |  39 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  31 +-
 .../joins/BroadcastHashOuterJoin.scala          |  29 +-
 .../sql/execution/joins/ShuffledHashJoin.scala  |   2 +
 .../sql/execution/joins/SortMergeJoin.scala     |   2 +
 .../apache/spark/sql/ui/AllExecutionsPage.scala | 238 +++++++++++++
 .../org/apache/spark/sql/ui/ExecutionPage.scala | 127 +++++++
 .../org/apache/spark/sql/ui/SQLListener.scala   | 354 +++++++++++++++++++
 .../scala/org/apache/spark/sql/ui/SQLTab.scala  |  41 +++
 .../apache/spark/sql/ui/SparkPlanGraph.scala    | 118 +++++++
 .../org/apache/spark/sql/CachedTableSuite.scala |   8 +-
 .../apache/spark/sql/ui/SQLListenerSuite.scala  | 347 ++++++++++++++++++
 24 files changed, 1735 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index b6a0119..462d5c9 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -153,11 +153,12 @@ class Accumulable[R, T] private[spark] (
     value_ = zero
     deserialized = true
     // Automatically register the accumulator when it is deserialized with the 
task closure.
-    // Note that internal accumulators are deserialized before the TaskContext 
is created and
-    // are registered in the TaskContext constructor.
-    if (!isInternal) {
-      val taskContext = TaskContext.get()
-      assume(taskContext != null, "Task context was null when deserializing 
user accumulators")
+    //
+    // Note internal accumulators sent with task are deserialized before the 
TaskContext is created
+    // and are registered in the TaskContext constructor. Other internal 
accumulators, such SQL
+    // metrics, still need to register here.
+    val taskContext = TaskContext.get()
+    if (taskContext != null) {
       taskContext.registerAccumulator(this)
     }
   }
@@ -255,8 +256,8 @@ GrowableAccumulableParam[R <% Growable[T] with 
TraversableOnce[T] with Serializa
  * @tparam T result type
  */
 class Accumulator[T] private[spark] (
-    @transient initialValue: T,
-    param: AccumulatorParam[T],
+    @transient private[spark] val initialValue: T,
+    private[spark] val param: AccumulatorParam[T],
     name: Option[String],
     internal: Boolean)
   extends Accumulable[T, T](initialValue, param, name, internal) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4380cf4..0c07053 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1239,6 +1239,21 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /**
+   * Create an [[org.apache.spark.Accumulator]] variable of a given type, with 
a name for display
+   * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` 
method. Only the
+   * driver can access the accumulator's `value`. The latest local value of 
such accumulator will be
+   * sent back to the driver via heartbeats.
+   *
+   * @tparam T type that can be added to the accumulator, must be thread safe
+   */
+  private[spark] def internalAccumulator[T](initialValue: T, name: String)(
+    implicit param: AccumulatorParam[T]): Accumulator[T] = {
+    val acc = new Accumulator(initialValue, param, Some(name), internal = true)
+    cleaner.foreach(_.registerAccumulatorForCleanup(acc))
+    acc
+  }
+
+  /**
    * Create an [[org.apache.spark.Accumulable]] shared variable, to which 
tasks can add values
    * with `+=`. Only the driver can access the accumuable's `value`.
    * @tparam R accumulator result type

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 7bc7fce..5d78a9d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -249,6 +249,7 @@ private[spark] class Executor(
           m.setExecutorRunTime((taskFinish - taskStart) - 
task.executorDeserializeTime)
           m.setJvmGCTime(computeTotalGcTime() - startGCTime)
           m.setResultSerializationTime(afterSerialization - 
beforeSerialization)
+          m.updateAccumulators()
         }
 
         val directResult = new DirectTaskResult(valueBytes, accumUpdates, 
task.metrics.orNull)
@@ -300,6 +301,7 @@ private[spark] class Executor(
             task.metrics.map { m =>
               m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
               m.setJvmGCTime(computeTotalGcTime() - startGCTime)
+              m.updateAccumulators()
               m
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css 
b/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
new file mode 100644
index 0000000..ddd3a91
--- /dev/null
+++ 
b/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#plan-viz-graph .label {
+  font-weight: normal;
+  text-shadow: none;
+}
+
+#plan-viz-graph svg g.node rect {
+  fill: #C3EBFF;
+  stroke: #3EC0FF;
+  stroke-width: 1px;
+}
+
+/* Hightlight the SparkPlan node name */
+#plan-viz-graph svg text :first-child {
+  font-weight: bold;
+}
+
+#plan-viz-graph svg path {
+  stroke: #444;
+  stroke-width: 1.5px;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js 
b/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
new file mode 100644
index 0000000..5161fcd
--- /dev/null
+++ 
b/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+var PlanVizConstants = {
+  svgMarginX: 16,
+  svgMarginY: 16
+};
+
+function renderPlanViz() {
+  var svg = planVizContainer().append("svg");
+  var metadata = d3.select("#plan-viz-metadata");
+  var dot = metadata.select(".dot-file").text().trim();
+  var graph = svg.append("g");
+
+  var g = graphlibDot.read(dot);
+  preprocessGraphLayout(g);
+  var renderer = new dagreD3.render();
+  renderer(graph, g);
+
+  // Round corners on rectangles
+  svg
+    .selectAll("rect")
+    .attr("rx", "5")
+    .attr("ry", "5");
+
+  var nodeSize = parseInt($("#plan-viz-metadata-size").text());
+  for (var i = 0; i < nodeSize; i++) {
+    setupTooltipForSparkPlanNode(i);
+  }
+
+  resizeSvg(svg)
+}
+
+/* -------------------- *
+ * | Helper functions | *
+ * -------------------- */
+
+function planVizContainer() { return d3.select("#plan-viz-graph"); }
+
+/*
+ * Set up the tooltip for a SparkPlan node using metadata. When the user moves 
the mouse on the
+ * node, it will display the details of this SparkPlan node in the right.
+ */
+function setupTooltipForSparkPlanNode(nodeId) {
+  var nodeTooltip = d3.select("#plan-meta-data-" + nodeId).text()
+  d3.select("svg g .node_" + nodeId)
+    .on('mouseover', function(d) {
+      var domNode = d3.select(this).node();
+      $(domNode).tooltip({
+        title: nodeTooltip, trigger: "manual", container: "body", placement: 
"right"
+      });
+      $(domNode).tooltip("show");
+    })
+    .on('mouseout', function(d) {
+      var domNode = d3.select(this).node();
+      $(domNode).tooltip("destroy");
+    })
+}
+
+/*
+ * Helper function to pre-process the graph layout.
+ * This step is necessary for certain styles that affect the positioning
+ * and sizes of graph elements, e.g. padding, font style, shape.
+ */
+function preprocessGraphLayout(g) {
+  var nodes = g.nodes();
+  for (var i = 0; i < nodes.length; i++) {
+      var node = g.node(nodes[i]);
+      node.padding = "5";
+  }
+  // Curve the edges
+  var edges = g.edges();
+  for (var j = 0; j < edges.length; j++) {
+    var edge = g.edge(edges[j]);
+    edge.lineInterpolate = "basis";
+  }
+}
+
+/*
+ * Helper function to size the SVG appropriately such that all elements are 
displayed.
+ * This assumes that all outermost elements are clusters (rectangles).
+ */
+function resizeSvg(svg) {
+  var allClusters = svg.selectAll("g rect")[0];
+  console.log(allClusters);
+  var startX = -PlanVizConstants.svgMarginX +
+    toFloat(d3.min(allClusters, function(e) {
+      console.log(e);
+      return getAbsolutePosition(d3.select(e)).x;
+    }));
+  var startY = -PlanVizConstants.svgMarginY +
+    toFloat(d3.min(allClusters, function(e) {
+      return getAbsolutePosition(d3.select(e)).y;
+    }));
+  var endX = PlanVizConstants.svgMarginX +
+    toFloat(d3.max(allClusters, function(e) {
+      var t = d3.select(e);
+      return getAbsolutePosition(t).x + toFloat(t.attr("width"));
+    }));
+  var endY = PlanVizConstants.svgMarginY +
+    toFloat(d3.max(allClusters, function(e) {
+      var t = d3.select(e);
+      return getAbsolutePosition(t).y + toFloat(t.attr("height"));
+    }));
+  var width = endX - startX;
+  var height = endY - startY;
+  svg.attr("viewBox", startX + " " + startY + " " + width + " " + height)
+     .attr("width", width)
+     .attr("height", height);
+}
+
+/* Helper function to convert attributes to numeric values. */
+function toFloat(f) {
+  if (f) {
+    return parseFloat(f.toString().replace(/px$/, ""));
+  } else {
+    return f;
+  }
+}
+
+/*
+ * Helper function to compute the absolute position of the specified element 
in our graph.
+ */
+function getAbsolutePosition(d3selection) {
+  if (d3selection.empty()) {
+    throw "Attempted to get absolute position of an empty selection.";
+  }
+  var obj = d3selection;
+  var _x = toFloat(obj.attr("x")) || 0;
+  var _y = toFloat(obj.attr("y")) || 0;
+  while (!obj.empty()) {
+    var transformText = obj.attr("transform");
+    if (transformText) {
+      var translate = d3.transform(transformText).translate;
+      _x += toFloat(translate[0]);
+      _y += toFloat(translate[1]);
+    }
+    // Climb upwards to find how our parents are translated
+    obj = d3.select(obj.node().parentNode);
+    // Stop when we've reached the graph container itself
+    if (obj.node() == planVizContainer().node()) {
+      break;
+    }
+  }
+  return { x: _x, y: _y };
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3ea0f9e..1f83567 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation}
 import org.apache.spark.sql.sources.HadoopFsRelation
@@ -119,6 +119,9 @@ class DataFrame private[sql](
     @transient val sqlContext: SQLContext,
     @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) 
extends Serializable {
 
+  // Note for Spark contributors: if adding or updating any action in 
`DataFrame`, please make sure
+  // you wrap it with `withNewExecutionId` if this actions doesn't call other 
action.
+
   /**
    * A constructor that automatically analyzes the logical plan.
    *
@@ -1356,14 +1359,18 @@ class DataFrame private[sql](
    * @group rdd
    * @since 1.3.0
    */
-  def foreach(f: Row => Unit): Unit = rdd.foreach(f)
+  def foreach(f: Row => Unit): Unit = withNewExecutionId {
+    rdd.foreach(f)
+  }
 
   /**
    * Applies a function f to each partition of this [[DataFrame]].
    * @group rdd
    * @since 1.3.0
    */
-  def foreachPartition(f: Iterator[Row] => Unit): Unit = 
rdd.foreachPartition(f)
+  def foreachPartition(f: Iterator[Row] => Unit): Unit = withNewExecutionId {
+    rdd.foreachPartition(f)
+  }
 
   /**
    * Returns the first `n` rows in the [[DataFrame]].
@@ -1377,14 +1384,18 @@ class DataFrame private[sql](
    * @group action
    * @since 1.3.0
    */
-  def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
+  def collect(): Array[Row] = withNewExecutionId {
+    queryExecution.executedPlan.executeCollect()
+  }
 
   /**
    * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
    * @group action
    * @since 1.3.0
    */
-  def collectAsList(): java.util.List[Row] = 
java.util.Arrays.asList(rdd.collect() : _*)
+  def collectAsList(): java.util.List[Row] = withNewExecutionId {
+    java.util.Arrays.asList(rdd.collect() : _*)
+  }
 
   /**
    * Returns the number of rows in the [[DataFrame]].
@@ -1863,6 +1874,14 @@ class DataFrame private[sql](
     write.mode(SaveMode.Append).insertInto(tableName)
   }
 
+  /**
+   * Wrap a DataFrame action to track all Spark jobs in the body so that we 
can connect them with
+   * an execution.
+   */
+  private[sql] def withNewExecutionId[T](body: => T): T = {
+    SQLExecution.withNewExecutionId(sqlContext, queryExecution)(body)
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   ////////////////////////////////////////////////////////////////////////////
   // End of deprecated methods

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index dbb2a09..ffc2baf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.ui.{SQLListener, SQLTab}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -74,6 +75,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   protected[sql] def conf = currentSession().conf
 
+  // `listener` should be only used in the driver
+  @transient private[sql] val listener = new SQLListener(this)
+  sparkContext.addSparkListener(listener)
+  sparkContext.ui.foreach(new SQLTab(this, _))
+
   /**
    * Set Spark SQL configuration properties.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index da27a75..fbaa8e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -96,6 +96,9 @@ private[sql] case class LogicalRDD(
 private[sql] case class PhysicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow]) extends LeafNode {
+
+  override protected[sql] val trackNumOfRowsEnabled = true
+
   protected override def doExecute(): RDD[InternalRow] = rdd
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 34e926e..858dd85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -30,6 +30,8 @@ private[sql] case class LocalTableScan(
     output: Seq[Attribute],
     rows: Seq[InternalRow]) extends LeafNode {
 
+  override protected[sql] val trackNumOfRowsEnabled = true
+
   private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
 
   protected override def doExecute(): RDD[InternalRow] = rdd

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
new file mode 100644
index 0000000..97f1323
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.ui.SparkPlanGraph
+import org.apache.spark.util.Utils
+
+private[sql] object SQLExecution {
+
+  val EXECUTION_ID_KEY = "spark.sql.execution.id"
+
+  private val _nextExecutionId = new AtomicLong(0)
+
+  private def nextExecutionId: Long = _nextExecutionId.getAndIncrement
+
+  /**
+   * Wrap an action that will execute "queryExecution" to track all Spark jobs 
in the body so that
+   * we can connect them with an execution.
+   */
+  def withNewExecutionId[T](
+      sqlContext: SQLContext, queryExecution: SQLContext#QueryExecution)(body: 
=> T): T = {
+    val sc = sqlContext.sparkContext
+    val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
+    if (oldExecutionId == null) {
+      val executionId = SQLExecution.nextExecutionId
+      sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
+      val r = try {
+        val callSite = Utils.getCallSite()
+        sqlContext.listener.onExecutionStart(
+          executionId,
+          callSite.shortForm,
+          callSite.longForm,
+          queryExecution.toString,
+          SparkPlanGraph(queryExecution.executedPlan),
+          System.currentTimeMillis())
+        try {
+          body
+        } finally {
+          // Ideally, we need to make sure onExecutionEnd happens after 
onJobStart and onJobEnd.
+          // However, onJobStart and onJobEnd run in the listener thread. 
Because we cannot add new
+          // SQL event types to SparkListener since it's a public API, we 
cannot guarantee that.
+          //
+          // SQLListener should handle the case that onExecutionEnd happens 
before onJobEnd.
+          //
+          // The worst case is onExecutionEnd may happen before onJobStart 
when the listener thread
+          // is very busy. If so, we cannot track the jobs for the execution. 
It seems acceptable.
+          sqlContext.listener.onExecutionEnd(executionId, 
System.currentTimeMillis())
+        }
+      } finally {
+        sc.setLocalProperty(EXECUTION_ID_KEY, null)
+      }
+      r
+    } else {
+      // Don't support nested `withNewExecutionId`. This is an example of the 
nested
+      // `withNewExecutionId`:
+      //
+      // class DataFrame {
+      //   def foo: T = withNewExecutionId { 
something.createNewDataFrame().collect() }
+      // }
+      //
+      // Note: `collect` will call withNewExecutionId
+      // In this case, only the "executedPlan" for "collect" will be executed. 
The "executedPlan"
+      // for the outer DataFrame won't be executed. So it's meaningless to 
create a new Execution
+      // for the outer DataFrame. Even if we track it, since its 
"executedPlan" doesn't run,
+      // all accumulator metrics will be 0. It will confuse people if we show 
them in Web UI.
+      //
+      // A real case is the `DataFrame.count` method.
+      throw new IllegalArgumentException(s"$EXECUTION_ID_KEY is already set")
+    }
+  }
+
+  /**
+   * Wrap an action with a known executionId. When running a different action 
in a different
+   * thread from the original one, this method can be used to connect the 
Spark jobs in this action
+   * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`.
+   */
+  def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T 
= {
+    val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    try {
+      sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
+      body
+    } finally {
+      sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 50c27de..73b237f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.atomic.AtomicBoolean
+
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.Logging
+import org.apache.spark.{Accumulator, Logging}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.sql.SQLContext
@@ -59,12 +61,39 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
     false
   }
 
+  /**
+   * Whether the "prepare" method is called.
+   */
+  private val prepareCalled = new AtomicBoolean(false)
+
   /** Overridden make copy also propogates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): this.type = {
     SparkPlan.currentContext.set(sqlContext)
     super.makeCopy(newArgs)
   }
 
+  /**
+   * Whether track the number of rows output by this SparkPlan
+   */
+  protected[sql] def trackNumOfRowsEnabled: Boolean = false
+
+  private lazy val numOfRowsAccumulator = sparkContext.internalAccumulator(0L, 
"number of rows")
+
+  /**
+   * Return all accumulators containing metrics of this SparkPlan.
+   */
+  private[sql] def accumulators: Map[String, Accumulator[_]] = if 
(trackNumOfRowsEnabled) {
+      Map("numRows" -> numOfRowsAccumulator)
+    } else {
+      Map.empty
+    }
+
+  /**
+   * Return the accumulator according to the name.
+   */
+  private[sql] def accumulator[T](name: String): Accumulator[T] =
+    accumulators(name).asInstanceOf[Accumulator[T]]
+
   // TODO: Move to `DistributedPlan`
   /** Specifies how data is partitioned across different nodes in the cluster. 
*/
   def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG 
WIDTH!
@@ -110,11 +139,40 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
         "Operator will receive unsafe rows as input but cannot process unsafe 
rows")
     }
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
-      doExecute()
+      prepare()
+      if (trackNumOfRowsEnabled) {
+        val numRows = accumulator[Long]("numRows")
+        doExecute().map { row =>
+          numRows += 1
+          row
+        }
+      } else {
+        doExecute()
+      }
     }
   }
 
   /**
+   * Prepare a SparkPlan for execution. It's idempotent.
+   */
+  final def prepare(): Unit = {
+    if (prepareCalled.compareAndSet(false, true)) {
+      doPrepare
+      children.foreach(_.prepare())
+    }
+  }
+
+  /**
+   * Overridden by concrete implementations of SparkPlan. It is guaranteed to 
run before any
+   * `execute` of SparkPlan. This is helpful if we want to set up some state 
before executing the
+   * query, e.g., `BroadcastHashJoin` uses it to broadcast asynchronously.
+   *
+   * Note: the prepare method has already walked down the tree, so the 
implementation doesn't need
+   * to call children's prepare methods.
+   */
+  protected def doPrepare(): Unit = {}
+
+  /**
    * Overridden by concrete implementations of SparkPlan.
    * Produces the result of the query as an RDD[InternalRow]
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 5a1b000..4771702 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -81,8 +81,22 @@ case class TungstenProject(projectList: 
Seq[NamedExpression], child: SparkPlan)
 case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
-  protected override def doExecute(): RDD[InternalRow] = 
child.execute().mapPartitions { iter =>
-    iter.filter(newPredicate(condition, child.output))
+  private[sql] override lazy val accumulators = Map(
+    "numInputRows" -> sparkContext.internalAccumulator(0L, "number of input 
rows"),
+    "numOutputRows" -> sparkContext.internalAccumulator(0L, "number of output 
rows"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numInputRows = accumulator[Long]("numInputRows")
+    val numOutputRows = accumulator[Long]("numOutputRows")
+    child.execute().mapPartitions { iter =>
+      val predicate = newPredicate(condition, child.output)
+      iter.filter { row =>
+        numInputRows += 1
+        val r = predicate(row)
+        if (r) numOutputRows += 1
+        r
+      }
+    }
   }
 
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
index d551f38..cf19911 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.util.SerializableConfiguration
@@ -122,25 +122,26 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
       // We create a DataFrame by applying the schema of relation to the data 
to make sure.
       // We are writing data based on the expected schema,
-      val df = {
-        // For partitioned relation r, r.schema's column ordering can be 
different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after 
data column). We
-        // need a Project to adjust the ordering, so that inside 
InsertIntoHadoopFsRelation, we can
-        // safely apply the schema of r.schema to the data.
-        val project = Project(
-          relation.schema.map(field => new 
UnresolvedAttribute(Seq(field.name))), query)
-
-        sqlContext.internalCreateDataFrame(
-          DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
-      }
 
-      val partitionColumns = relation.partitionColumns.fieldNames
-      if (partitionColumns.isEmpty) {
-        insert(new DefaultWriterContainer(relation, job, isAppend), df)
-      } else {
-        val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
-        insertWithDynamicPartitions(sqlContext, writerContainer, df, 
partitionColumns)
+      // For partitioned relation r, r.schema's column ordering can be 
different from the column
+      // ordering of data.logicalPlan (partition columns are all moved after 
data column). We
+      // need a Project to adjust the ordering, so that inside 
InsertIntoHadoopFsRelation, we can
+      // safely apply the schema of r.schema to the data.
+      val project = Project(
+        relation.schema.map(field => new 
UnresolvedAttribute(Seq(field.name))), query)
+
+      val queryExecution = DataFrame(sqlContext, project).queryExecution
+      SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
+        val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, 
relation.schema)
+
+        val partitionColumns = relation.partitionColumns.fieldNames
+        if (partitionColumns.isEmpty) {
+          insert(new DefaultWriterContainer(relation, job, isAppend), df)
+        } else {
+          val writerContainer = new DynamicPartitionWriterContainer(
+            relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
+          insertWithDynamicPartitions(sqlContext, writerContainer, df, 
partitionColumns)
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index e73e252..ec1a148 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
Partitioning, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -45,6 +45,8 @@ case class BroadcastHashJoin(
     right: SparkPlan)
   extends BinaryNode with HashJoin {
 
+  override protected[sql] val trackNumOfRowsEnabled = true
+
   val timeout: Duration = {
     val timeoutValue = sqlContext.conf.broadcastTimeout
     if (timeoutValue < 0) {
@@ -59,13 +61,28 @@ case class BroadcastHashJoin(
   override def requiredChildDistribution: Seq[Distribution] =
     UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
 
+  // Use lazy so that we won't do broadcast when calling explain but still 
cache the broadcast value
+  // for the same query.
   @transient
-  private val broadcastFuture = future {
-    // Note that we use .execute().collect() because we don't want to convert 
data to Scala types
-    val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
-    val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, 
input.size)
-    sparkContext.broadcast(hashed)
-  }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
+  private lazy val broadcastFuture = {
+    // broadcastFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    future {
+      // This will run in another thread. Set the execution id so that we can 
connect these jobs
+      // with the correct execution.
+      SQLExecution.withExecutionId(sparkContext, executionId) {
+        // Note that we use .execute().collect() because we don't want to 
convert data to Scala
+        // types
+        val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+        val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, 
input.size)
+        sparkContext.broadcast(hashed)
+      }
+    }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
+  }
+
+  protected override def doPrepare(): Unit = {
+    broadcastFuture
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val broadcastRelation = Await.result(broadcastFuture, timeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index c35e439..e342fd9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
Distribution, UnspecifiedDistribution}
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -60,13 +60,28 @@ case class BroadcastHashOuterJoin(
 
   override def outputPartitioning: Partitioning = 
streamedPlan.outputPartitioning
 
+  // Use lazy so that we won't do broadcast when calling explain but still 
cache the broadcast value
+  // for the same query.
   @transient
-  private val broadcastFuture = future {
-    // Note that we use .execute().collect() because we don't want to convert 
data to Scala types
-    val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
-    val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
-    sparkContext.broadcast(hashed)
-  }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
+  private lazy val broadcastFuture = {
+    // broadcastFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    future {
+      // This will run in another thread. Set the execution id so that we can 
connect these jobs
+      // with the correct execution.
+      SQLExecution.withExecutionId(sparkContext, executionId) {
+        // Note that we use .execute().collect() because we don't want to 
convert data to Scala
+        // types
+        val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+        val hashed = HashedRelation(input.iterator, buildKeyGenerator, 
input.size)
+        sparkContext.broadcast(hashed)
+      }
+    }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
+  }
+
+  protected override def doPrepare(): Unit = {
+    broadcastFuture
+  }
 
   override def doExecute(): RDD[InternalRow] = {
     val broadcastRelation = Await.result(broadcastFuture, timeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index fc6efe8..c923dc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -38,6 +38,8 @@ case class ShuffledHashJoin(
     right: SparkPlan)
   extends BinaryNode with HashJoin {
 
+  override protected[sql] val trackNumOfRowsEnabled = true
+
   override def outputPartitioning: Partitioning =
     PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 41be78a..eb59549 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -38,6 +38,8 @@ case class SortMergeJoin(
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode {
 
+  override protected[sql] val trackNumOfRowsEnabled = true
+
   override def output: Seq[Attribute] = left.output ++ right.output
 
   override def outputPartitioning: Partitioning =

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
new file mode 100644
index 0000000..727fc4b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ui/AllExecutionsPage.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.Logging
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with 
Logging {
+
+  private val listener = parent.listener
+
+  override def render(request: HttpServletRequest): Seq[Node] = {
+    val currentTime = System.currentTimeMillis()
+    val content = listener.synchronized {
+      val _content = mutable.ListBuffer[Node]()
+      if (listener.getRunningExecutions.nonEmpty) {
+        _content ++=
+          new RunningExecutionTable(
+            parent, "Running Queries", currentTime,
+            
listener.getRunningExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
+      }
+      if (listener.getCompletedExecutions.nonEmpty) {
+        _content ++=
+          new CompletedExecutionTable(
+            parent, "Completed Queries", currentTime,
+            
listener.getCompletedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
+      }
+      if (listener.getFailedExecutions.nonEmpty) {
+        _content ++=
+          new FailedExecutionTable(
+            parent, "Failed Queries", currentTime,
+            
listener.getFailedExecutions.sortBy(_.submissionTime).reverse).toNodeSeq
+      }
+      _content
+    }
+    UIUtils.headerSparkPage("SQL", content, parent, Some(5000))
+  }
+}
+
+private[ui] abstract class ExecutionTable(
+    parent: SQLTab,
+    tableId: String,
+    tableName: String,
+    currentTime: Long,
+    executionUIDatas: Seq[SQLExecutionUIData],
+    showRunningJobs: Boolean,
+    showSucceededJobs: Boolean,
+    showFailedJobs: Boolean) {
+
+  protected def baseHeader: Seq[String] = Seq(
+    "ID",
+    "Description",
+    "Submitted",
+    "Duration")
+
+  protected def header: Seq[String]
+
+  protected def row(currentTime: Long, executionUIData: SQLExecutionUIData): 
Seq[Node] = {
+    val submissionTime = executionUIData.submissionTime
+    val duration = executionUIData.completionTime.getOrElse(currentTime) - 
submissionTime
+
+    val runningJobs = executionUIData.runningJobs.map { jobId =>
+      <a href={jobURL(jobId)}>{jobId.toString}</a><br/>
+    }
+    val succeededJobs = executionUIData.succeededJobs.sorted.map { jobId =>
+      <a href={jobURL(jobId)}>{jobId.toString}</a><br/>
+    }
+    val failedJobs = executionUIData.failedJobs.sorted.map { jobId =>
+      <a href={jobURL(jobId)}>{jobId.toString}</a><br/>
+    }
+    <tr>
+      <td>
+        {executionUIData.executionId.toString}
+      </td>
+      <td>
+        {descriptionCell(executionUIData)}
+      </td>
+      <td sorttable_customkey={submissionTime.toString}>
+        {UIUtils.formatDate(submissionTime)}
+      </td>
+      <td sorttable_customkey={duration.toString}>
+        {UIUtils.formatDuration(duration)}
+      </td>
+      {if (showRunningJobs) {
+        <td>
+          {runningJobs}
+        </td>
+      }}
+      {if (showSucceededJobs) {
+        <td>
+          {succeededJobs}
+        </td>
+      }}
+      {if (showFailedJobs) {
+        <td>
+          {failedJobs}
+        </td>
+      }}
+      {detailCell(executionUIData.physicalPlanDescription)}
+    </tr>
+  }
+
+  private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = {
+    val details = if (execution.details.nonEmpty) {
+      <span 
onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +details
+      </span> ++
+      <div class="stage-details collapsed">
+        <pre>{execution.details}</pre>
+      </div>
+    } else {
+      Nil
+    }
+
+    val desc = {
+      <a href={executionURL(execution.executionId)}>{execution.description}</a>
+    }
+
+    <div>{desc} {details}</div>
+  }
+
+  private def detailCell(physicalPlan: String): Seq[Node] = {
+    val isMultiline = physicalPlan.indexOf('\n') >= 0
+    val summary = StringEscapeUtils.escapeHtml4(
+      if (isMultiline) {
+        physicalPlan.substring(0, physicalPlan.indexOf('\n'))
+      } else {
+        physicalPlan
+      })
+    val details = if (isMultiline) {
+      // scalastyle:off
+      <span 
onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +details
+      </span> ++
+        <div class="stacktrace-details collapsed">
+          <pre>{physicalPlan}</pre>
+        </div>
+      // scalastyle:on
+    } else {
+      ""
+    }
+    <td>{summary}{details}</td>
+  }
+
+  def toNodeSeq: Seq[Node] = {
+    <div>
+      <h4>{tableName}</h4>
+      {UIUtils.listingTable[SQLExecutionUIData](
+        header, row(currentTime, _), executionUIDatas, id = Some(tableId))}
+    </div>
+  }
+
+  private def jobURL(jobId: Long): String =
+    "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId)
+
+  private def executionURL(executionID: Long): String =
+    "%s/sql/execution?id=%s".format(UIUtils.prependBaseUri(parent.basePath), 
executionID)
+}
+
+private[ui] class RunningExecutionTable(
+    parent: SQLTab,
+    tableName: String,
+    currentTime: Long,
+    executionUIDatas: Seq[SQLExecutionUIData])
+  extends ExecutionTable(
+    parent,
+    "running-execution-table",
+    tableName,
+    currentTime,
+    executionUIDatas,
+    showRunningJobs = true,
+    showSucceededJobs = true,
+    showFailedJobs = true) {
+
+  override protected def header: Seq[String] =
+    baseHeader ++ Seq("Running Jobs", "Succeeded Jobs", "Failed Jobs", 
"Detail")
+}
+
+private[ui] class CompletedExecutionTable(
+    parent: SQLTab,
+    tableName: String,
+    currentTime: Long,
+    executionUIDatas: Seq[SQLExecutionUIData])
+  extends ExecutionTable(
+    parent,
+    "completed-execution-table",
+    tableName,
+    currentTime,
+    executionUIDatas,
+    showRunningJobs = false,
+    showSucceededJobs = true,
+    showFailedJobs = false) {
+
+  override protected def header: Seq[String] = baseHeader ++ Seq("Jobs", 
"Detail")
+}
+
+private[ui] class FailedExecutionTable(
+    parent: SQLTab,
+    tableName: String,
+    currentTime: Long,
+    executionUIDatas: Seq[SQLExecutionUIData])
+  extends ExecutionTable(
+    parent,
+    "failed-execution-table",
+    tableName,
+    currentTime,
+    executionUIDatas,
+    showRunningJobs = false,
+    showSucceededJobs = true,
+    showFailedJobs = true) {
+
+  override protected def header: Seq[String] =
+    baseHeader ++ Seq("Succeeded Jobs", "Failed Jobs", "Detail")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
new file mode 100644
index 0000000..52ddf99
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ui/ExecutionPage.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Unparsed}
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.Logging
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[sql] class ExecutionPage(parent: SQLTab) extends 
WebUIPage("execution") with Logging {
+
+  private val listener = parent.listener
+
+  override def render(request: HttpServletRequest): Seq[Node] = 
listener.synchronized {
+    val parameterExecutionId = request.getParameter("id")
+    require(parameterExecutionId != null && parameterExecutionId.nonEmpty,
+      "Missing execution id parameter")
+
+    val executionId = parameterExecutionId.toLong
+    val content = listener.getExecution(executionId).map { executionUIData =>
+      val currentTime = System.currentTimeMillis()
+      val duration =
+        executionUIData.completionTime.getOrElse(currentTime) - 
executionUIData.submissionTime
+
+      val summary =
+        <div>
+          <ul class="unstyled">
+            <li>
+              <strong>Submitted Time: 
</strong>{UIUtils.formatDate(executionUIData.submissionTime)}
+            </li>
+            <li>
+              <strong>Duration: </strong>{UIUtils.formatDuration(duration)}
+            </li>
+            {if (executionUIData.runningJobs.nonEmpty) {
+              <li>
+                <strong>Running Jobs: </strong>
+                {executionUIData.runningJobs.sorted.map { jobId =>
+                <a href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
+              }}
+              </li>
+            }}
+            {if (executionUIData.succeededJobs.nonEmpty) {
+              <li>
+                <strong>Succeeded Jobs: </strong>
+                {executionUIData.succeededJobs.sorted.map { jobId =>
+                  <a 
href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
+                }}
+              </li>
+            }}
+            {if (executionUIData.failedJobs.nonEmpty) {
+              <li>
+                <strong>Failed Jobs: </strong>
+                {executionUIData.failedJobs.sorted.map { jobId =>
+                  <a 
href={jobURL(jobId)}>{jobId.toString}</a><span>&nbsp;</span>
+                }}
+              </li>
+            }}
+            <li>
+              <strong>Detail: </strong><br/>
+              <pre>{executionUIData.physicalPlanDescription}</pre>
+            </li>
+          </ul>
+        </div>
+
+      val metrics = listener.getExecutionMetrics(executionId)
+
+      summary ++ planVisualization(metrics, executionUIData.physicalPlanGraph)
+    }.getOrElse {
+      <div>No information to display for Plan {executionId}</div>
+    }
+
+    UIUtils.headerSparkPage(s"Details for Query $executionId", content, 
parent, Some(5000))
+  }
+
+
+  private def planVisualizationResources: Seq[Node] = {
+    // scalastyle:off
+    <link rel="stylesheet" 
href={UIUtils.prependBaseUri("/static/sql/spark-sql-viz.css")} type="text/css"/>
+    <script src={UIUtils.prependBaseUri("/static/d3.min.js")}></script>
+    <script src={UIUtils.prependBaseUri("/static/dagre-d3.min.js")}></script>
+    <script 
src={UIUtils.prependBaseUri("/static/graphlib-dot.min.js")}></script>
+    <script 
src={UIUtils.prependBaseUri("/static/sql/spark-sql-viz.js")}></script>
+    // scalastyle:on
+  }
+
+  private def planVisualization(metrics: Map[Long, Any], graph: 
SparkPlanGraph): Seq[Node] = {
+    val metadata = graph.nodes.flatMap { node =>
+      val nodeId = s"plan-meta-data-${node.id}"
+      <div id={nodeId}>{node.desc}</div>
+    }
+
+    <div>
+      <div id="plan-viz-graph"></div>
+      <div id="plan-viz-metadata" style="display:none">
+        <div class="dot-file">
+          {graph.makeDotFile(metrics)}
+        </div>
+        <div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
+        {metadata}
+      </div>
+      {planVisualizationResources}
+      <script>$(function(){{ renderPlanViz(); }})</script>
+    </div>
+  }
+
+  private def jobURL(jobId: Long): String =
+    "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), jobId)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
new file mode 100644
index 0000000..e7b1dd1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLListener.scala
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ui
+
+import scala.collection.mutable
+
+import com.google.common.annotations.VisibleForTesting
+
+import org.apache.spark.{AccumulatorParam, JobExecutionStatus, Logging}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.SQLExecution
+
+private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener 
with Logging {
+
+  private val retainedExecutions =
+    sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 
1000)
+
+  private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
+
+  // Old data in the following fields must be removed in 
"trimExecutionsIfNecessary".
+  // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up 
old data
+
+  // VisibleForTesting
+  private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]()
+
+  /**
+   * Maintain the relation between job id and execution id so that we can get 
the execution id in
+   * the "onJobEnd" method.
+   */
+  private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
+
+  private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]()
+
+  private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
+
+  private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
+
+  @VisibleForTesting
+  def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
+    _executionIdToData.toMap
+  }
+
+  @VisibleForTesting
+  def jobIdToExecutionId: Map[Long, Long] = synchronized {
+    _jobIdToExecutionId.toMap
+  }
+
+  @VisibleForTesting
+  def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
+    _stageIdToStageMetrics.toMap
+  }
+
+  private def trimExecutionsIfNecessary(
+      executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
+    if (executions.size > retainedExecutions) {
+      val toRemove = math.max(retainedExecutions / 10, 1)
+      executions.take(toRemove).foreach { execution =>
+        for (executionUIData <- 
_executionIdToData.remove(execution.executionId)) {
+          for (jobId <- executionUIData.jobs.keys) {
+            _jobIdToExecutionId.remove(jobId)
+          }
+          for (stageId <- executionUIData.stages) {
+            _stageIdToStageMetrics.remove(stageId)
+          }
+        }
+      }
+      executions.trimStart(toRemove)
+    }
+  }
+
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+    val executionIdString = 
jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+    if (executionIdString == null) {
+      // This is not a job created by SQL
+      return
+    }
+    val executionId = executionIdString.toLong
+    val jobId = jobStart.jobId
+    val stageIds = jobStart.stageIds
+
+    synchronized {
+      activeExecutions.get(executionId).foreach { executionUIData =>
+        executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
+        executionUIData.stages ++= stageIds
+        stageIds.foreach(stageId =>
+          _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId 
= 0))
+        _jobIdToExecutionId(jobId) = executionId
+      }
+    }
+  }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
+    val jobId = jobEnd.jobId
+    for (executionId <- _jobIdToExecutionId.get(jobId);
+         executionUIData <- _executionIdToData.get(executionId)) {
+      jobEnd.jobResult match {
+        case JobSucceeded => executionUIData.jobs(jobId) = 
JobExecutionStatus.SUCCEEDED
+        case JobFailed(_) => executionUIData.jobs(jobId) = 
JobExecutionStatus.FAILED
+      }
+      if (executionUIData.completionTime.nonEmpty && 
!executionUIData.hasRunningJobs) {
+        // We are the last job of this execution, so mark the execution as 
finished. Note that
+        // `onExecutionEnd` also does this, but currently that can be called 
before `onJobEnd`
+        // since these are called on different threads.
+        markExecutionFinished(executionId)
+      }
+    }
+  }
+
+  override def onExecutorMetricsUpdate(
+      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
+    for ((taskId, stageId, stageAttemptID, metrics) <- 
executorMetricsUpdate.taskMetrics) {
+      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, 
finishTask = false)
+    }
+  }
+
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
Unit = synchronized {
+    val stageId = stageSubmitted.stageInfo.stageId
+    val stageAttemptId = stageSubmitted.stageInfo.attemptId
+    // Always override metrics for old stage attempt
+    _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+    updateTaskAccumulatorValues(
+      taskEnd.taskInfo.taskId,
+      taskEnd.stageId,
+      taskEnd.stageAttemptId,
+      taskEnd.taskMetrics,
+      finishTask = true)
+  }
+
+  /**
+   * Update the accumulator values of a task with the latest metrics for this 
task. This is called
+   * every time we receive an executor heartbeat or when a task finishes.
+   */
+  private def updateTaskAccumulatorValues(
+      taskId: Long,
+      stageId: Int,
+      stageAttemptID: Int,
+      metrics: TaskMetrics,
+      finishTask: Boolean): Unit = {
+    if (metrics == null) {
+      return
+    }
+
+    _stageIdToStageMetrics.get(stageId) match {
+      case Some(stageMetrics) =>
+        if (stageAttemptID < stageMetrics.stageAttemptId) {
+          // A task of an old stage attempt. Because a new stage is submitted, 
we can ignore it.
+        } else if (stageAttemptID > stageMetrics.stageAttemptId) {
+          logWarning(s"A task should not have a higher stageAttemptID 
($stageAttemptID) then " +
+            s"what we have seen (${stageMetrics.stageAttemptId}})")
+        } else {
+          // TODO We don't know the attemptId. Currently, what we can do is 
overriding the
+          // accumulator updates. However, if there are two same task are 
running, such as
+          // speculation, the accumulator updates will be overriding by 
different task attempts,
+          // the results will be weird.
+          stageMetrics.taskIdToMetricUpdates.get(taskId) match {
+            case Some(taskMetrics) =>
+              if (finishTask) {
+                taskMetrics.finished = true
+                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+              } else if (!taskMetrics.finished) {
+                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+              } else {
+                // If a task is finished, we should not override with 
accumulator updates from
+                // heartbeat reports
+              }
+            case None =>
+              // TODO Now just set attemptId to 0. Should fix here when we can 
get the attempt
+              // id from SparkListenerExecutorMetricsUpdate
+              stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
+                  attemptId = 0, finished = finishTask, 
metrics.accumulatorUpdates())
+          }
+        }
+      case None =>
+      // This execution and its stage have been dropped
+    }
+  }
+
+  def onExecutionStart(
+      executionId: Long,
+      description: String,
+      details: String,
+      physicalPlanDescription: String,
+      physicalPlanGraph: SparkPlanGraph,
+      time: Long): Unit = {
+    val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+      node.metrics.map(metric => metric.accumulatorId -> metric)
+    }
+
+    val executionUIData = new SQLExecutionUIData(executionId, description, 
details,
+      physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
+    synchronized {
+      activeExecutions(executionId) = executionUIData
+      _executionIdToData(executionId) = executionUIData
+    }
+  }
+
+  def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
+    _executionIdToData.get(executionId).foreach { executionUIData =>
+      executionUIData.completionTime = Some(time)
+      if (!executionUIData.hasRunningJobs) {
+        // onExecutionEnd happens after all "onJobEnd"s
+        // So we should update the execution lists.
+        markExecutionFinished(executionId)
+      } else {
+        // There are some running jobs, onExecutionEnd happens before some 
"onJobEnd"s.
+        // Then we don't if the execution is successful, so let the last 
onJobEnd updates the
+        // execution lists.
+      }
+    }
+  }
+
+  private def markExecutionFinished(executionId: Long): Unit = {
+    activeExecutions.remove(executionId).foreach { executionUIData =>
+      if (executionUIData.isFailed) {
+        failedExecutions += executionUIData
+        trimExecutionsIfNecessary(failedExecutions)
+      } else {
+        completedExecutions += executionUIData
+        trimExecutionsIfNecessary(completedExecutions)
+      }
+    }
+  }
+
+  def getRunningExecutions: Seq[SQLExecutionUIData] = synchronized {
+    activeExecutions.values.toSeq
+  }
+
+  def getFailedExecutions: Seq[SQLExecutionUIData] = synchronized {
+    failedExecutions
+  }
+
+  def getCompletedExecutions: Seq[SQLExecutionUIData] = synchronized {
+    completedExecutions
+  }
+
+  def getExecution(executionId: Long): Option[SQLExecutionUIData] = 
synchronized {
+    _executionIdToData.get(executionId)
+  }
+
+  /**
+   * Get all accumulator updates from all tasks which belong to this execution 
and merge them.
+   */
+  def getExecutionMetrics(executionId: Long): Map[Long, Any] = synchronized {
+    _executionIdToData.get(executionId) match {
+      case Some(executionUIData) =>
+        val accumulatorUpdates = {
+          for (stageId <- executionUIData.stages;
+               stageMetrics <- _stageIdToStageMetrics.get(stageId).toIterable;
+               taskMetrics <- stageMetrics.taskIdToMetricUpdates.values;
+               accumulatorUpdate <- taskMetrics.accumulatorUpdates.toSeq) 
yield {
+            accumulatorUpdate
+          }
+        }.filter { case (id, _) => 
executionUIData.accumulatorMetrics.keySet(id) }
+        mergeAccumulatorUpdates(accumulatorUpdates, accumulatorId =>
+          executionUIData.accumulatorMetrics(accumulatorId).accumulatorParam)
+      case None =>
+        // This execution has been dropped
+        Map.empty
+    }
+  }
+
+  private def mergeAccumulatorUpdates(
+      accumulatorUpdates: Seq[(Long, Any)],
+      paramFunc: Long => AccumulatorParam[Any]): Map[Long, Any] = {
+    accumulatorUpdates.groupBy(_._1).map { case (accumulatorId, values) =>
+      val param = paramFunc(accumulatorId)
+      (accumulatorId, values.map(_._2).reduceLeft(param.addInPlace))
+    }
+  }
+
+}
+
+/**
+ * Represent all necessary data for an execution that will be used in Web UI.
+ */
+private[ui] class SQLExecutionUIData(
+    val executionId: Long,
+    val description: String,
+    val details: String,
+    val physicalPlanDescription: String,
+    val physicalPlanGraph: SparkPlanGraph,
+    val accumulatorMetrics: Map[Long, SQLPlanMetric],
+    val submissionTime: Long,
+    var completionTime: Option[Long] = None,
+    val jobs: mutable.HashMap[Long, JobExecutionStatus] = 
mutable.HashMap.empty,
+    val stages: mutable.ArrayBuffer[Int] = mutable.ArrayBuffer()) {
+
+  /**
+   * Return whether there are running jobs in this execution.
+   */
+  def hasRunningJobs: Boolean = jobs.values.exists(_ == 
JobExecutionStatus.RUNNING)
+
+  /**
+   * Return whether there are any failed jobs in this execution.
+   */
+  def isFailed: Boolean = jobs.values.exists(_ == JobExecutionStatus.FAILED)
+
+  def runningJobs: Seq[Long] =
+    jobs.filter { case (_, status) => status == JobExecutionStatus.RUNNING 
}.keys.toSeq
+
+  def succeededJobs: Seq[Long] =
+    jobs.filter { case (_, status) => status == JobExecutionStatus.SUCCEEDED 
}.keys.toSeq
+
+  def failedJobs: Seq[Long] =
+    jobs.filter { case (_, status) => status == JobExecutionStatus.FAILED 
}.keys.toSeq
+}
+
+/**
+ * Represent a metric in a SQLPlan.
+ *
+ * Because we cannot revert our changes for an "Accumulator", we need to 
maintain accumulator
+ * updates for each task. So that if a task is retried, we can simply override 
the old updates with
+ * the new updates of the new attempt task. Since we cannot add them to 
accumulator, we need to use
+ * "AccumulatorParam" to get the aggregation value.
+ */
+private[ui] case class SQLPlanMetric(
+    name: String,
+    accumulatorId: Long,
+    accumulatorParam: AccumulatorParam[Any])
+
+/**
+ * Store all accumulatorUpdates for all tasks in a Spark stage.
+ */
+private[ui] class SQLStageMetrics(
+    val stageAttemptId: Long,
+    val taskIdToMetricUpdates: mutable.HashMap[Long, SQLTaskMetrics] = 
mutable.HashMap.empty)
+
+/**
+ * Store all accumulatorUpdates for a Spark task.
+ */
+private[ui] class SQLTaskMetrics(
+    val attemptId: Long, // TODO not used yet
+    var finished: Boolean,
+    var accumulatorUpdates: Map[Long, Any])

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
new file mode 100644
index 0000000..a9e5226
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ui/SQLTab.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ui
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
+  extends SparkUITab(sparkUI, "sql") with Logging {
+
+
+  val parent = sparkUI
+  val listener = sqlContext.listener
+
+  attachPage(new AllExecutionsPage(this))
+  attachPage(new ExecutionPage(this))
+  parent.attachTab(this)
+
+  parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, "/static/sql")
+}
+
+private[sql] object SQLTab {
+
+  private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/ui/static"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
new file mode 100644
index 0000000..7910c16
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.ui
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.AccumulatorParam
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * A graph used for storing information of an executionPlan of DataFrame.
+ *
+ * Each graph is defined with a set of nodes and a set of edges. Each node 
represents a node in the
+ * SparkPlan tree, and each edge represents a parent-child relationship 
between two nodes.
+ */
+private[ui] case class SparkPlanGraph(
+    nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) {
+
+  def makeDotFile(metrics: Map[Long, Any]): String = {
+    val dotFile = new StringBuilder
+    dotFile.append("digraph G {\n")
+    nodes.foreach(node => dotFile.append(node.makeDotNode(metrics) + "\n"))
+    edges.foreach(edge => dotFile.append(edge.makeDotEdge + "\n"))
+    dotFile.append("}")
+    dotFile.toString()
+  }
+}
+
+private[sql] object SparkPlanGraph {
+
+  /**
+   * Build a SparkPlanGraph from the root of a SparkPlan tree.
+   */
+  def apply(plan: SparkPlan): SparkPlanGraph = {
+    val nodeIdGenerator = new AtomicLong(0)
+    val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
+    val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
+    buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
+    new SparkPlanGraph(nodes, edges)
+  }
+
+  private def buildSparkPlanGraphNode(
+      plan: SparkPlan,
+      nodeIdGenerator: AtomicLong,
+      nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
+      edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
+    val metrics = plan.accumulators.toSeq.map { case (key, accumulator) =>
+      SQLPlanMetric(accumulator.name.getOrElse(key), accumulator.id,
+        accumulator.param.asInstanceOf[AccumulatorParam[Any]])
+    }
+    val node = SparkPlanGraphNode(
+      nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, 
metrics)
+    nodes += node
+    val childrenNodes = plan.children.map(
+      child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
+    for (child <- childrenNodes) {
+      edges += SparkPlanGraphEdge(child.id, node.id)
+    }
+    node
+  }
+}
+
+/**
+ * Represent a node in the SparkPlan tree, along with its metrics.
+ *
+ * @param id generated by "SparkPlanGraph". There is no duplicate id in a graph
+ * @param name the name of this SparkPlan node
+ * @param metrics metrics that this SparkPlan node will track
+ */
+private[ui] case class SparkPlanGraphNode(
+    id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
+
+  def makeDotNode(metricsValue: Map[Long, Any]): String = {
+    val values = {
+      for (metric <- metrics;
+           value <- metricsValue.get(metric.accumulatorId)) yield {
+        metric.name + ": " + value
+      }
+    }
+    val label = if (values.isEmpty) {
+        name
+      } else {
+        // If there are metrics, display all metrics in a separate line. We 
should use an escaped
+        // "\n" here to follow the dot syntax.
+        //
+        // Note: whitespace between two "\n"s is to create an empty line 
between the name of
+        // SparkPlan and metrics. If removing it, it won't display the empty 
line in UI.
+        name + "\\n \\n" + values.mkString("\\n")
+      }
+    s"""  $id [label="$label"];"""
+  }
+}
+
+/**
+ * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, 
and `toId` is the child
+ * node id.
+ */
+private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
+
+  def makeDotEdge: String = s"""  $fromId->$toId;\n"""
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ebc3aad2/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index eb3e913..e9dd7ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -305,12 +305,8 @@ class CachedTableSuite extends QueryTest {
     sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
     sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
 
-    Accumulators.synchronized {
-      val accsSize = Accumulators.originals.size
-      ctx.cacheTable("t1")
-      ctx.cacheTable("t2")
-      assert((accsSize + 2) == Accumulators.originals.size)
-    }
+    ctx.cacheTable("t1")
+    ctx.cacheTable("t2")
 
     sql("SELECT * FROM t1").count()
     sql("SELECT * FROM t2").count()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to