Repository: spark
Updated Branches:
  refs/heads/master f2fbfa444 -> 2c5dee0fb


Revert "[SPARK-11206] Support SQL UI on the history server"

This reverts commit cc243a079b1c039d6e7f0b410d1654d94a090e14 / PR #9297

I'm reverting this because it broke SQLListenerMemoryLeakSuite in the master 
Maven builds.

See #9991 for a discussion of why this broke the tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c5dee0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c5dee0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c5dee0f

Branch: refs/heads/master
Commit: 2c5dee0fb8e4d1734ea3a0f22e0b5bfd2f6dba46
Parents: f2fbfa4
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Nov 30 13:41:52 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Nov 30 13:42:35 2015 -0800

----------------------------------------------------------------------
 .rat-excludes                                   |   1 -
 .../org/apache/spark/JavaSparkListener.java     |   3 -
 .../org/apache/spark/SparkFirehoseListener.java |   4 -
 .../spark/scheduler/EventLoggingListener.scala  |   4 -
 .../apache/spark/scheduler/SparkListener.scala  |  24 +---
 .../spark/scheduler/SparkListenerBus.scala      |   1 -
 .../scala/org/apache/spark/ui/SparkUI.scala     |  16 +--
 .../org/apache/spark/util/JsonProtocol.scala    |  11 +-
 ....spark.scheduler.SparkHistoryListenerFactory |   1 -
 .../scala/org/apache/spark/sql/SQLContext.scala |  18 +--
 .../spark/sql/execution/SQLExecution.scala      |  24 +++-
 .../spark/sql/execution/SparkPlanInfo.scala     |  46 ------
 .../sql/execution/metric/SQLMetricInfo.scala    |  30 ----
 .../spark/sql/execution/metric/SQLMetrics.scala |  56 +++-----
 .../spark/sql/execution/ui/ExecutionPage.scala  |   4 +-
 .../spark/sql/execution/ui/SQLListener.scala    | 139 ++++++-------------
 .../apache/spark/sql/execution/ui/SQLTab.scala  |  12 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  20 +--
 .../sql/execution/metric/SQLMetricsSuite.scala  |   4 +-
 .../sql/execution/ui/SQLListenerSuite.scala     |  43 +++---
 .../spark/sql/test/SharedSQLContext.scala       |   1 -
 21 files changed, 135 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 7262c96..08fba6d 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -82,5 +82,4 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
-org.apache.spark.scheduler.SparkHistoryListenerFactory
 .*parquet

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/java/org/apache/spark/JavaSparkListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java 
b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index 23bc9a2..fa9acf0 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -82,7 +82,4 @@ public class JavaSparkListener implements SparkListener {
   @Override
   public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
 
-  @Override
-  public void onOtherEvent(SparkListenerEvent event) { }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java 
b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index e6b24af..1214d05 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -118,8 +118,4 @@ public class SparkFirehoseListener implements SparkListener 
{
         onEvent(blockUpdated);
     }
 
-    @Override
-    public void onOtherEvent(SparkListenerEvent event) {
-        onEvent(event);
-    }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index eaa07ac..000a021 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -207,10 +207,6 @@ private[spark] class EventLoggingListener(
   // No-op because logging every update would be overkill
   override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
 
-  override def onOtherEvent(event: SparkListenerEvent): Unit = {
-    logEvent(event, flushLogger = true)
-  }
-
   /**
    * Stop logging events. The event log file will be renamed so that it loses 
the
    * ".inprogress" suffix.

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 075a7f1..896f174 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -22,19 +22,15 @@ import java.util.Properties
 import scala.collection.Map
 import scala.collection.mutable
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo
-
-import org.apache.spark.{Logging, SparkConf, TaskEndReason}
+import org.apache.spark.{Logging, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 import org.apache.spark.util.{Distribution, Utils}
-import org.apache.spark.ui.SparkUI
 
 @DeveloperApi
-@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "Event")
-trait SparkListenerEvent
+sealed trait SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: 
Properties = null)
@@ -135,17 +131,6 @@ case class SparkListenerApplicationEnd(time: Long) extends 
SparkListenerEvent
 private[spark] case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
 /**
- * Interface for creating history listeners defined in other modules like SQL, 
which are used to
- * rebuild the history UI.
- */
-private[spark] trait SparkHistoryListenerFactory {
-  /**
-   * Create listeners used to rebuild the history UI.
-   */
-  def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
-}
-
-/**
  * :: DeveloperApi ::
  * Interface for listening to events from the Spark scheduler. Note that this 
is an internal
  * interface which might change in different Spark releases. Java clients 
should extend
@@ -238,11 +223,6 @@ trait SparkListener {
    * Called when the driver receives a block update info.
    */
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
-
-  /**
-   * Called when other events like SQL-specific events are posted.
-   */
-  def onOtherEvent(event: SparkListenerEvent) { }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 95722a0..04afde3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,7 +61,6 @@ private[spark] trait SparkListenerBus extends 
ListenerBus[SparkListener, SparkLi
       case blockUpdated: SparkListenerBlockUpdated =>
         listener.onBlockUpdated(blockUpdated)
       case logStart: SparkListenerLogStart => // ignore event log metadata
-      case _ => listener.onOtherEvent(event)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 8da6884..4608bce 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,13 +17,10 @@
 
 package org.apache.spark.ui
 
-import java.util.{Date, ServiceLoader}
-
-import scala.collection.JavaConverters._
+import java.util.Date
 
 import org.apache.spark.status.api.v1.{ApiRootResource, 
ApplicationAttemptInfo, ApplicationInfo,
   UIRoot}
-import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.StorageStatusListener
@@ -157,16 +154,7 @@ private[spark] object SparkUI {
       appName: String,
       basePath: String,
       startTime: Long): SparkUI = {
-    val sparkUI = create(
-      None, conf, listenerBus, securityManager, appName, basePath, startTime = 
startTime)
-
-    val listenerFactories = 
ServiceLoader.load(classOf[SparkHistoryListenerFactory],
-      Utils.getContextOrSparkClassLoader).asScala
-    listenerFactories.foreach { listenerFactory =>
-      val listeners = listenerFactory.createListeners(conf, sparkUI)
-      listeners.foreach(listenerBus.addListener)
-    }
-    sparkUI
+    create(None, conf, listenerBus, securityManager, appName, basePath, 
startTime = startTime)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 7f5d713..c9beeb2 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,21 +19,19 @@ package org.apache.spark.util
 
 import java.util.{Properties, UUID}
 
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
-import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
 
 /**
@@ -56,8 +54,6 @@ private[spark] object JsonProtocol {
 
   private implicit val format = DefaultFormats
 
-  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
-
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
@@ -100,7 +96,6 @@ private[spark] object JsonProtocol {
         executorMetricsUpdateToJson(metricsUpdate)
       case blockUpdated: SparkListenerBlockUpdated =>
         throw new MatchError(blockUpdated)  // TODO(ekl) implement this
-      case _ => parse(mapper.writeValueAsString(event))
     }
   }
 
@@ -516,8 +511,6 @@ private[spark] object JsonProtocol {
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
-      case other => mapper.readValue(compact(render(json)), 
Utils.classForName(other))
-        .asInstanceOf[SparkListenerEvent]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
deleted file mode 100644
index 507100b..0000000
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 8d27839..9cc65de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1263,8 +1263,6 @@ object SQLContext {
    */
   @transient private val instantiatedContext = new 
AtomicReference[SQLContext]()
 
-  @transient private val sqlListener = new AtomicReference[SQLListener]()
-
   /**
    * Get the singleton SQLContext if it exists or create a new one using the 
given SparkContext.
    *
@@ -1309,10 +1307,6 @@ object SQLContext {
     Option(instantiatedContext.get())
   }
 
-  private[sql] def clearSqlListener(): Unit = {
-    sqlListener.set(null)
-  }
-
   /**
    * Changes the SQLContext that will be returned in this thread and its 
children when
    * SQLContext.getOrCreate() is called. This can be used to ensure that a 
given thread receives
@@ -1361,13 +1355,9 @@ object SQLContext {
    * Create a SQLListener then add it into SparkContext, and create an SQLTab 
if there is SparkUI.
    */
   private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
-    if (sqlListener.get() == null) {
-      val listener = new SQLListener(sc.conf)
-      if (sqlListener.compareAndSet(null, listener)) {
-        sc.addSparkListener(listener)
-        sc.ui.foreach(new SQLTab(listener, _))
-      }
-    }
-    sqlListener.get()
+    val listener = new SQLListener(sc.conf)
+    sc.addSparkListener(listener)
+    sc.ui.foreach(new SQLTab(listener, _))
+    listener
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 3497198..1422e15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,8 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
-  SparkListenerSQLExecutionEnd}
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.util.Utils
 
 private[sql] object SQLExecution {
@@ -46,14 +45,25 @@ private[sql] object SQLExecution {
       sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
       val r = try {
         val callSite = Utils.getCallSite()
-        
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
-          executionId, callSite.shortForm, callSite.longForm, 
queryExecution.toString,
-          SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), 
System.currentTimeMillis()))
+        sqlContext.listener.onExecutionStart(
+          executionId,
+          callSite.shortForm,
+          callSite.longForm,
+          queryExecution.toString,
+          SparkPlanGraph(queryExecution.executedPlan),
+          System.currentTimeMillis())
         try {
           body
         } finally {
-          
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
-            executionId, System.currentTimeMillis()))
+          // Ideally, we need to make sure onExecutionEnd happens after 
onJobStart and onJobEnd.
+          // However, onJobStart and onJobEnd run in the listener thread. 
Because we cannot add new
+          // SQL event types to SparkListener since it's a public API, we 
cannot guarantee that.
+          //
+          // SQLListener should handle the case that onExecutionEnd happens 
before onJobEnd.
+          //
+          // The worst case is onExecutionEnd may happen before onJobStart 
when the listener thread
+          // is very busy. If so, we cannot track the jobs for the execution. 
It seems acceptable.
+          sqlContext.listener.onExecutionEnd(executionId, 
System.currentTimeMillis())
         }
       } finally {
         sc.setLocalProperty(EXECUTION_ID_KEY, null)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
deleted file mode 100644
index 486ce34..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.metric.SQLMetricInfo
-import org.apache.spark.util.Utils
-
-/**
- * :: DeveloperApi ::
- * Stores information about a SQL SparkPlan.
- */
-@DeveloperApi
-class SparkPlanInfo(
-    val nodeName: String,
-    val simpleString: String,
-    val children: Seq[SparkPlanInfo],
-    val metrics: Seq[SQLMetricInfo])
-
-private[sql] object SparkPlanInfo {
-
-  def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
-    val metrics = plan.metrics.toSeq.map { case (key, metric) =>
-      new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
-        Utils.getFormattedClassName(metric.param))
-    }
-    val children = plan.children.map(fromSparkPlan)
-
-    new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
deleted file mode 100644
index 2708219..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.metric
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Stores information about a SQL Metric.
- */
-@DeveloperApi
-class SQLMetricInfo(
-    val name: String,
-    val accumulatorId: Long,
-    val metricParam: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 6c0f6f8..1c253e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -104,39 +104,21 @@ private class LongSQLMetricParam(val stringValue: 
Seq[Long] => String, initialVa
   override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
 }
 
-private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 
0L)
-
-private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
-  (values: Seq[Long]) => {
-    // This is a workaround for SPARK-11013.
-    // We use -1 as initial value of the accumulator, if the accumulator is 
valid, we will update
-    // it at the end of task and the value will be at least 0.
-    val validValues = values.filter(_ >= 0)
-    val Seq(sum, min, med, max) = {
-      val metric = if (validValues.length == 0) {
-        Seq.fill(4)(0L)
-      } else {
-        val sorted = validValues.sorted
-        Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), 
sorted(validValues.length - 1))
-      }
-      metric.map(Utils.bytesToString)
-    }
-    s"\n$sum ($min, $med, $max)"
-  }, -1L)
-
 private[sql] object SQLMetrics {
 
   private def createLongMetric(
       sc: SparkContext,
       name: String,
-      param: LongSQLMetricParam): LongSQLMetric = {
+      stringValue: Seq[Long] => String,
+      initialValue: Long): LongSQLMetric = {
+    val param = new LongSQLMetricParam(stringValue, initialValue)
     val acc = new LongSQLMetric(name, param)
     sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
   }
 
   def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
-    createLongMetric(sc, name, LongSQLMetricParam)
+    createLongMetric(sc, name, _.sum.toString, 0L)
   }
 
   /**
@@ -144,25 +126,31 @@ private[sql] object SQLMetrics {
    * spill size, etc.
    */
   def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = {
+    val stringValue = (values: Seq[Long]) => {
+      // This is a workaround for SPARK-11013.
+      // We use -1 as initial value of the accumulator, if the accumulator is 
valid, we will update
+      // it at the end of task and the value will be at least 0.
+      val validValues = values.filter(_ >= 0)
+      val Seq(sum, min, med, max) = {
+        val metric = if (validValues.length == 0) {
+          Seq.fill(4)(0L)
+        } else {
+          val sorted = validValues.sorted
+          Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), 
sorted(validValues.length - 1))
+        }
+        metric.map(Utils.bytesToString)
+      }
+      s"\n$sum ($min, $med, $max)"
+    }
     // The final result of this metric in physical operator UI may looks like:
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
-    createLongMetric(sc, s"$name total (min, med, max)", 
StaticsLongSQLMetricParam)
-  }
-
-  def getMetricParam(metricParamName: String): 
SQLMetricParam[SQLMetricValue[Any], Any] = {
-    val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
-    val staticsSQLMetricParam = 
Utils.getFormattedClassName(StaticsLongSQLMetricParam)
-    val metricParam = metricParamName match {
-      case `longSQLMetricParam` => LongSQLMetricParam
-      case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
-    }
-    metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
+    createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
   }
 
   /**
    * A metric that its value will be ignored. Use this one when we need a 
metric parameter but don't
    * care about the value.
    */
-  val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam)
+  val nullLongMetric = new LongSQLMetric("null", new 
LongSQLMetricParam(_.sum.toString, 0L))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index c74ad40..e74d6fb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.Node
+import scala.xml.{Node, Unparsed}
+
+import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index e19a1e3..5a072de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,34 +19,11 @@ package org.apache.spark.sql.execution.ui
 
 import scala.collection.mutable
 
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.SparkPlanInfo
-import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, 
SQLMetricValue, SQLMetricParam}
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
 import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
-import org.apache.spark.ui.SparkUI
-
-@DeveloperApi
-case class SparkListenerSQLExecutionStart(
-    executionId: Long,
-    description: String,
-    details: String,
-    physicalPlanDescription: String,
-    sparkPlanInfo: SparkPlanInfo,
-    time: Long)
-  extends SparkListenerEvent
-
-@DeveloperApi
-case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
-  extends SparkListenerEvent
-
-private[sql] class SQLHistoryListenerFactory extends 
SparkHistoryListenerFactory {
-
-  override def createListeners(conf: SparkConf, sparkUI: SparkUI): 
Seq[SparkListener] = {
-    List(new SQLHistoryListener(conf, sparkUI))
-  }
-}
 
 private[sql] class SQLListener(conf: SparkConf) extends SparkListener with 
Logging {
 
@@ -141,8 +118,7 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
   override def onExecutorMetricsUpdate(
       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
     for ((taskId, stageId, stageAttemptID, metrics) <- 
executorMetricsUpdate.taskMetrics) {
-      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, 
metrics.accumulatorUpdates(),
-        finishTask = false)
+      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, 
finishTask = false)
     }
   }
 
@@ -164,7 +140,7 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
       taskEnd.taskInfo.taskId,
       taskEnd.stageId,
       taskEnd.stageAttemptId,
-      taskEnd.taskMetrics.accumulatorUpdates(),
+      taskEnd.taskMetrics,
       finishTask = true)
   }
 
@@ -172,12 +148,15 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
    * Update the accumulator values of a task with the latest metrics for this 
task. This is called
    * every time we receive an executor heartbeat or when a task finishes.
    */
-  protected def updateTaskAccumulatorValues(
+  private def updateTaskAccumulatorValues(
       taskId: Long,
       stageId: Int,
       stageAttemptID: Int,
-      accumulatorUpdates: Map[Long, Any],
+      metrics: TaskMetrics,
       finishTask: Boolean): Unit = {
+    if (metrics == null) {
+      return
+    }
 
     _stageIdToStageMetrics.get(stageId) match {
       case Some(stageMetrics) =>
@@ -195,9 +174,9 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
             case Some(taskMetrics) =>
               if (finishTask) {
                 taskMetrics.finished = true
-                taskMetrics.accumulatorUpdates = accumulatorUpdates
+                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
               } else if (!taskMetrics.finished) {
-                taskMetrics.accumulatorUpdates = accumulatorUpdates
+                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
               } else {
                 // If a task is finished, we should not override with 
accumulator updates from
                 // heartbeat reports
@@ -206,7 +185,7 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
               // TODO Now just set attemptId to 0. Should fix here when we can 
get the attempt
               // id from SparkListenerExecutorMetricsUpdate
               stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
-                  attemptId = 0, finished = finishTask, accumulatorUpdates)
+                  attemptId = 0, finished = finishTask, 
metrics.accumulatorUpdates())
           }
         }
       case None =>
@@ -214,40 +193,38 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
     }
   }
 
-  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-    case SparkListenerSQLExecutionStart(executionId, description, details,
-      physicalPlanDescription, sparkPlanInfo, time) =>
-      val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
-      val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
-        node.metrics.map(metric => metric.accumulatorId -> metric)
-      }
-      val executionUIData = new SQLExecutionUIData(
-        executionId,
-        description,
-        details,
-        physicalPlanDescription,
-        physicalPlanGraph,
-        sqlPlanMetrics.toMap,
-        time)
-      synchronized {
-        activeExecutions(executionId) = executionUIData
-        _executionIdToData(executionId) = executionUIData
-      }
-    case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
-      _executionIdToData.get(executionId).foreach { executionUIData =>
-        executionUIData.completionTime = Some(time)
-        if (!executionUIData.hasRunningJobs) {
-          // onExecutionEnd happens after all "onJobEnd"s
-          // So we should update the execution lists.
-          markExecutionFinished(executionId)
-        } else {
-          // There are some running jobs, onExecutionEnd happens before some 
"onJobEnd"s.
-          // Then we don't if the execution is successful, so let the last 
onJobEnd updates the
-          // execution lists.
-        }
+  def onExecutionStart(
+      executionId: Long,
+      description: String,
+      details: String,
+      physicalPlanDescription: String,
+      physicalPlanGraph: SparkPlanGraph,
+      time: Long): Unit = {
+    val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+      node.metrics.map(metric => metric.accumulatorId -> metric)
+    }
+
+    val executionUIData = new SQLExecutionUIData(executionId, description, 
details,
+      physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
+    synchronized {
+      activeExecutions(executionId) = executionUIData
+      _executionIdToData(executionId) = executionUIData
+    }
+  }
+
+  def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
+    _executionIdToData.get(executionId).foreach { executionUIData =>
+      executionUIData.completionTime = Some(time)
+      if (!executionUIData.hasRunningJobs) {
+        // onExecutionEnd happens after all "onJobEnd"s
+        // So we should update the execution lists.
+        markExecutionFinished(executionId)
+      } else {
+        // There are some running jobs, onExecutionEnd happens before some 
"onJobEnd"s.
+        // Then we don't if the execution is successful, so let the last 
onJobEnd updates the
+        // execution lists.
       }
     }
-    case _ => // Ignore
   }
 
   private def markExecutionFinished(executionId: Long): Unit = {
@@ -312,38 +289,6 @@ private[sql] class SQLListener(conf: SparkConf) extends 
SparkListener with Loggi
 
 }
 
-private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
-  extends SQLListener(conf) {
-
-  private var sqlTabAttached = false
-
-  override def onExecutorMetricsUpdate(
-      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = 
synchronized {
-    // Do nothing
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
-    updateTaskAccumulatorValues(
-      taskEnd.taskInfo.taskId,
-      taskEnd.stageId,
-      taskEnd.stageAttemptId,
-      taskEnd.taskInfo.accumulables.map { acc =>
-        (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
-      }.toMap,
-      finishTask = true)
-  }
-
-  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-    case _: SparkListenerSQLExecutionStart =>
-      if (!sqlTabAttached) {
-        new SQLTab(this, sparkUI)
-        sqlTabAttached = true
-      }
-      super.onOtherEvent(event)
-    case _ => super.onOtherEvent(event)
-  }
-}
-
 /**
  * Represent all necessary data for an execution that will be used in Web UI.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 4f50b2e..9c27944 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.execution.ui
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import org.apache.spark.Logging
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
-  extends SparkUITab(sparkUI, "SQL") with Logging {
+  extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
 
   val parent = sparkUI
 
@@ -33,5 +35,13 @@ private[sql] class SQLTab(val listener: SQLListener, 
sparkUI: SparkUI)
 }
 
 private[sql] object SQLTab {
+
   private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
+
+  private val nextTabId = new AtomicInteger(0)
+
+  private def nextTabName: String = {
+    val nextId = nextTabId.getAndIncrement()
+    if (nextId == 0) "SQL" else s"SQL$nextId"
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 7af0ff0..f1fce54 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.execution.SparkPlanInfo
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
 
 /**
  * A graph used for storing information of an executionPlan of DataFrame.
@@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph {
   /**
    * Build a SparkPlanGraph from the root of a SparkPlan tree.
    */
-  def apply(planInfo: SparkPlanInfo): SparkPlanGraph = {
+  def apply(plan: SparkPlan): SparkPlanGraph = {
     val nodeIdGenerator = new AtomicLong(0)
     val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
     val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
-    buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
+    buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
     new SparkPlanGraph(nodes, edges)
   }
 
   private def buildSparkPlanGraphNode(
-      planInfo: SparkPlanInfo,
+      plan: SparkPlan,
       nodeIdGenerator: AtomicLong,
       nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
       edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
-    val metrics = planInfo.metrics.map { metric =>
-      SQLPlanMetric(metric.name, metric.accumulatorId,
-        SQLMetrics.getMetricParam(metric.metricParam))
+    val metrics = plan.metrics.toSeq.map { case (key, metric) =>
+      SQLPlanMetric(metric.name.getOrElse(key), metric.id,
+        metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
     }
     val node = SparkPlanGraphNode(
-      nodeIdGenerator.getAndIncrement(), planInfo.nodeName, 
planInfo.simpleString, metrics)
+      nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, 
metrics)
     nodes += node
-    val childrenNodes = planInfo.children.map(
+    val childrenNodes = plan.children.map(
       child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
     for (child <- childrenNodes) {
       edges += SparkPlanGraphEdge(child.id, node.id)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4f2cad1..82867ab 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,7 +26,6 @@ import org.apache.xbean.asm5.Opcodes._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -83,8 +82,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
       val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
-      val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
-        df.queryExecution.executedPlan)).nodes.filter { node =>
+      val actualMetrics = 
SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
         expectedMetrics.contains(node.id)
       }.map { node =>
         val nodeMetrics = node.metrics.map { metric =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index f93d081..c15aac7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -21,10 +21,10 @@ import java.util.Properties
 
 import org.apache.spark.{SparkException, SparkContext, SparkConf, 
SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.sql.execution.metric.LongSQLMetricValue
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.LongSQLMetricValue
+import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
@@ -82,8 +82,7 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     val executionId = 0
     val df = createTestDataFrame
     val accumulatorIds =
-      
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
-        .nodes.flatMap(_.metrics.map(_.accumulatorId))
+      
SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId))
     // Assume all accumulators are long
     var accumulatorValue = 0L
     val accumulatorUpdates = accumulatorIds.map { id =>
@@ -91,13 +90,13 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
       (id, accumulatorValue)
     }.toMap
 
-    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+    listener.onExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
+      SparkPlanGraph(df.queryExecution.executedPlan),
+      System.currentTimeMillis())
 
     val executionUIData = listener.executionIdToData(0)
 
@@ -207,8 +206,7 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
       time = System.currentTimeMillis(),
       JobSucceeded
     ))
-    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
+    listener.onExecutionEnd(executionId, System.currentTimeMillis())
 
     assert(executionUIData.runningJobs.isEmpty)
     assert(executionUIData.succeededJobs === Seq(0))
@@ -221,20 +219,19 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+    listener.onExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
+      SparkPlanGraph(df.queryExecution.executedPlan),
+      System.currentTimeMillis())
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
       stageInfos = Nil,
       createProperties(executionId)))
-    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
+    listener.onExecutionEnd(executionId, System.currentTimeMillis())
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -251,13 +248,13 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+    listener.onExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
+      SparkPlanGraph(df.queryExecution.executedPlan),
+      System.currentTimeMillis())
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -274,8 +271,7 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
       time = System.currentTimeMillis(),
       stageInfos = Nil,
       createProperties(executionId)))
-    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
+    listener.onExecutionEnd(executionId, System.currentTimeMillis())
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 1,
       time = System.currentTimeMillis(),
@@ -292,20 +288,19 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+    listener.onExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
+      SparkPlanGraph(df.queryExecution.executedPlan),
+      System.currentTimeMillis())
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
       stageInfos = Seq.empty,
       createProperties(executionId)))
-    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
+    listener.onExecutionEnd(executionId, System.currentTimeMillis())
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 0,
       time = System.currentTimeMillis(),

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5dee0f/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e7b3765..963d10e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -42,7 +42,6 @@ trait SharedSQLContext extends SQLTestUtils {
    * Initialize the [[TestSQLContext]].
    */
   protected override def beforeAll(): Unit = {
-    SQLContext.clearSqlListener()
     if (_ctx == null) {
       _ctx = new TestSQLContext
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to