Repository: spark
Updated Branches:
  refs/heads/master 8a0ed5a5e -> d3a1d9527


[SPARK-22786][SQL] only use AppStatusPlugin in history server

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/19681 we introduced a new interface 
called `AppStatusPlugin`, to register listeners and set up the UI for both live 
and history UI.

However I think it's an overkill for live UI. For example, we should not 
register `SQLListener` if users are not using SQL functions. Previously we 
register the `SQLListener` and set up SQL tab when `SparkSession` is firstly 
created, which indicates users are going to use SQL functions. But in #19681 , 
we register the SQL functions during `SparkContext` creation. The same thing 
should apply to streaming too.

I think we should keep the previous behavior, and only use this new interface 
for history server.

To reflect this change, I also rename the new interface to 
`SparkHistoryUIPlugin`

This PR also refines the tests for sql listener.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19981 from cloud-fan/listener.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3a1d952
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3a1d952
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3a1d952

Branch: refs/heads/master
Commit: d3a1d9527bcd6675cc45773f01d4558cf4b46b3d
Parents: 8a0ed5a
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Dec 22 01:08:13 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Dec 22 01:08:13 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  16 +-
 .../deploy/history/FsHistoryProvider.scala      |  14 +-
 .../spark/status/AppHistoryServerPlugin.scala   |  38 ++
 .../apache/spark/status/AppStatusListener.scala |   2 +-
 .../apache/spark/status/AppStatusPlugin.scala   |  71 ---
 .../apache/spark/status/AppStatusStore.scala    |  17 +-
 .../org/apache/spark/ui/StagePageSuite.scala    |  20 +-
 ...g.apache.spark.status.AppHistoryServerPlugin |   1 +
 .../org.apache.spark.status.AppStatusPlugin     |   1 -
 .../sql/execution/ui/SQLAppStatusListener.scala |  23 +-
 .../sql/execution/ui/SQLAppStatusStore.scala    |  62 +--
 .../execution/ui/SQLHistoryServerPlugin.scala   |  36 ++
 .../apache/spark/sql/internal/SharedState.scala |  18 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  |   8 -
 .../execution/metric/SQLMetricsTestUtils.scala  |  10 +-
 .../ui/SQLAppStatusListenerSuite.scala          | 531 +++++++++++++++++++
 .../sql/execution/ui/SQLListenerSuite.scala     | 531 -------------------
 17 files changed, 668 insertions(+), 731 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 92e13ce..fcbeddd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
StandaloneSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
-import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
+import org.apache.spark.status.AppStatusStore
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
 import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
@@ -416,7 +416,8 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Initialize the app status store and listener before SparkEnv is created 
so that it gets
     // all events.
-    _statusStore = AppStatusStore.createLiveStore(conf, l => 
listenerBus.addToStatusQueue(l))
+    _statusStore = AppStatusStore.createLiveStore(conf)
+    listenerBus.addToStatusQueue(_statusStore.listener.get)
 
     // Create the Spark execution environment (cache, map output tracker, etc)
     _env = createSparkEnv(_conf, isLocal, listenerBus)
@@ -445,14 +446,9 @@ class SparkContext(config: SparkConf) extends Logging {
         // For tests, do not enable the UI
         None
       }
-    _ui.foreach { ui =>
-      // Load any plugins that might want to modify the UI.
-      AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
-
-      // Bind the UI before starting the task scheduler to communicate
-      // the bound port to the cluster manager properly
-      ui.bind()
-    }
+    // Bind the UI before starting the task scheduler to communicate
+    // the bound port to the cluster manager properly
+    _ui.foreach(_.bind())
 
     _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index fa2c519..a299b79 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -44,7 +44,6 @@ import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.status._
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
-import org.apache.spark.status.config._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 import org.apache.spark.util.kvstore._
@@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         (new InMemoryStore(), true)
     }
 
+    val plugins = ServiceLoader.load(
+      classOf[AppHistoryServerPlugin], 
Utils.getContextOrSparkClassLoader).asScala
     val trackingStore = new ElementTrackingStore(kvstore, conf)
     if (needReplay) {
       val replayBus = new ReplayListenerBus()
       val listener = new AppStatusListener(trackingStore, conf, false,
         lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
       replayBus.addListener(listener)
-      AppStatusPlugin.loadPlugins().foreach { plugin =>
-        plugin.setupListeners(conf, trackingStore, l => 
replayBus.addListener(l), false)
-      }
+      for {
+        plugin <- plugins
+        listener <- plugin.createListeners(conf, trackingStore)
+      } replayBus.addListener(listener)
       try {
         val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
         replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
@@ -353,9 +355,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
       attempt.info.startTime.getTime(),
       attempt.info.appSparkVersion)
-    AppStatusPlugin.loadPlugins().foreach { plugin =>
-      plugin.setupUI(ui)
-    }
+    plugins.foreach(_.setupUI(ui))
 
     val loadedUI = LoadedAppUI(ui)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala 
b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala
new file mode 100644
index 0000000..d144a0e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.ui.SparkUI
+
+/**
+ * An interface for creating history listeners(to replay event logs) defined 
in other modules like
+ * SQL, and setup the UI of the plugin to rebuild the history UI.
+ */
+private[spark] trait AppHistoryServerPlugin {
+  /**
+   * Creates listeners to replay the event logs.
+   */
+  def createListeners(conf: SparkConf, store: ElementTrackingStore): 
Seq[SparkListener]
+
+  /**
+   * Sets up UI of this plugin to rebuild the history UI.
+   */
+  def setupUI(ui: SparkUI): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 87eb84d..4db797e 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -48,7 +48,7 @@ private[spark] class AppStatusListener(
 
   import config._
 
-  private var sparkVersion = SPARK_VERSION
+  private val sparkVersion = SPARK_VERSION
   private var appInfo: v1.ApplicationInfo = null
   private var appSummary = new AppSummary(0, 0)
   private var coresPerTask: Int = 1

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
deleted file mode 100644
index 4cada5c..0000000
--- a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.status
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
-import org.apache.spark.util.kvstore.KVStore
-
-/**
- * An interface that defines plugins for collecting and storing application 
state.
- *
- * The plugin implementations are invoked for both live and replayed 
applications. For live
- * applications, it's recommended that plugins defer creation of UI tabs until 
there's actual
- * data to be shown.
- */
-private[spark] trait AppStatusPlugin {
-
-  /**
-   * Install listeners to collect data about the running application and 
populate the given
-   * store.
-   *
-   * @param conf The Spark configuration.
-   * @param store The KVStore where to keep application data.
-   * @param addListenerFn Function to register listeners with a bus.
-   * @param live Whether this is a live application (or an application being 
replayed by the
-   *             HistoryServer).
-   */
-  def setupListeners(
-      conf: SparkConf,
-      store: ElementTrackingStore,
-      addListenerFn: SparkListener => Unit,
-      live: Boolean): Unit
-
-  /**
-   * Install any needed extensions (tabs, pages, etc) to a Spark UI. The 
plugin can detect whether
-   * the app is live or replayed by looking at the UI's SparkContext field 
`sc`.
-   *
-   * @param ui The Spark UI instance for the application.
-   */
-  def setupUI(ui: SparkUI): Unit
-
-}
-
-private[spark] object AppStatusPlugin {
-
-  def loadPlugins(): Iterable[AppStatusPlugin] = {
-    ServiceLoader.load(classOf[AppStatusPlugin], 
Utils.getContextOrSparkClassLoader).asScala
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 9987419..5a942f5 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -17,16 +17,14 @@
 
 package org.apache.spark.status
 
-import java.io.File
-import java.util.{Arrays, List => JList}
+import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{JobExecutionStatus, SparkConf}
-import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.scope._
-import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.util.Distribution
 import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
 
 /**
@@ -34,7 +32,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
  */
 private[spark] class AppStatusStore(
     val store: KVStore,
-    listener: Option[AppStatusListener] = None) {
+    val listener: Option[AppStatusListener] = None) {
 
   def applicationInfo(): v1.ApplicationInfo = {
     store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
@@ -346,17 +344,10 @@ private[spark] object AppStatusStore {
 
   /**
    * Create an in-memory store for a live application.
-   *
-   * @param conf Configuration.
-   * @param addListenerFn Function to register a listener with a bus.
    */
-  def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): 
AppStatusStore = {
+  def createLiveStore(conf: SparkConf): AppStatusStore = {
     val store = new ElementTrackingStore(new InMemoryStore(), conf)
     val listener = new AppStatusListener(store, conf, true)
-    addListenerFn(listener)
-    AppStatusPlugin.loadPlugins().foreach { p =>
-      p.setupListeners(conf, store, addListenerFn, true)
-    }
     new AppStatusStore(store, listener = Some(listener))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 46932a0..661d0d4 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -22,7 +22,6 @@ import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
-import org.mockito.Matchers.anyString
 import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
 
 import org.apache.spark._
@@ -30,7 +29,6 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.ui.jobs.{StagePage, StagesTab}
-import org.apache.spark.util.Utils
 
 class StagePageSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -55,12 +53,12 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
    * This also runs a dummy stage to populate the page with useful content.
    */
   private def renderStagePage(conf: SparkConf): Seq[Node] = {
-    val bus = new ReplayListenerBus()
-    val store = AppStatusStore.createLiveStore(conf, l => bus.addListener(l))
+    val statusStore = AppStatusStore.createLiveStore(conf)
+    val listener = statusStore.listener.get
 
     try {
       val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
-      when(tab.store).thenReturn(store)
+      when(tab.store).thenReturn(statusStore)
 
       val request = mock(classOf[HttpServletRequest])
       when(tab.conf).thenReturn(conf)
@@ -68,7 +66,7 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
       when(tab.headerTabs).thenReturn(Seq.empty)
       when(request.getParameter("id")).thenReturn("0")
       when(request.getParameter("attempt")).thenReturn("0")
-      val page = new StagePage(tab, store)
+      val page = new StagePage(tab, statusStore)
 
       // Simulate a stage in job progress listener
       val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, 
"details")
@@ -77,17 +75,17 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
         taskId =>
           val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", 
TaskLocality.ANY,
             false)
-          bus.postToAll(SparkListenerStageSubmitted(stageInfo))
-          bus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
+          listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
+          listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
           taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
           val taskMetrics = TaskMetrics.empty
           taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
-          bus.postToAll(SparkListenerTaskEnd(0, 0, "result", Success, 
taskInfo, taskMetrics))
+          listener.onTaskEnd(SparkListenerTaskEnd(0, 0, "result", Success, 
taskInfo, taskMetrics))
       }
-      bus.postToAll(SparkListenerStageCompleted(stageInfo))
+      listener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
       page.render(request)
     } finally {
-      store.close()
+      statusStore.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
new file mode 100644
index 0000000..0bba2f8
--- /dev/null
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -0,0 +1 @@
+org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
deleted file mode 100644
index ac6d7f6..0000000
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.sql.execution.ui.SQLAppStatusPlugin

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index cf0000c..aa78fa0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -30,15 +30,11 @@ import org.apache.spark.sql.execution.metric._
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
 import org.apache.spark.status.config._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.kvstore.KVStore
 
-private[sql] class SQLAppStatusListener(
+class SQLAppStatusListener(
     conf: SparkConf,
     kvstore: ElementTrackingStore,
-    live: Boolean,
-    ui: Option[SparkUI] = None)
-  extends SparkListener with Logging {
+    live: Boolean) extends SparkListener with Logging {
 
   // How often to flush intermediate state of a live execution to the store. 
When replaying logs,
   // never flush (only do the very last write).
@@ -50,7 +46,10 @@ private[sql] class SQLAppStatusListener(
   private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
   private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
 
-  private var uiInitialized = false
+  // Returns true if this listener has no live data. Exposed for tests only.
+  private[sql] def noLiveData(): Boolean = {
+    liveExecutions.isEmpty && stageMetrics.isEmpty
+  }
 
   kvstore.addTrigger(classOf[SQLExecutionUIData], 
conf.get(UI_RETAINED_EXECUTIONS)) { count =>
     cleanupExecutions(count)
@@ -230,14 +229,6 @@ private[sql] class SQLAppStatusListener(
   }
 
   private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
-    // Install the SQL tab in a live app if it hasn't been initialized yet.
-    if (!uiInitialized) {
-      ui.foreach { _ui =>
-        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
-      }
-      uiInitialized = true
-    }
-
     val SparkListenerSQLExecutionStart(executionId, description, details,
       physicalPlanDescription, sparkPlanInfo, time) = event
 
@@ -389,7 +380,7 @@ private class LiveStageMetrics(
     val accumulatorIds: Array[Long],
     val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
 
-private[sql] class LiveTaskMetrics(
+private class LiveTaskMetrics(
     val ids: Array[Long],
     val values: Array[Long],
     val succeeded: Boolean)

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 7fd5f73..910f2e5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -25,21 +25,17 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
-import org.apache.spark.{JobExecutionStatus, SparkConf}
-import org.apache.spark.scheduler.SparkListener
-import org.apache.spark.status.{AppStatusPlugin, ElementTrackingStore}
+import org.apache.spark.JobExecutionStatus
 import org.apache.spark.status.KVUtils.KVIndexParam
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore.KVStore
 
 /**
  * Provides a view of a KVStore with methods that make it easy to query 
SQL-specific state. There's
  * no state kept in this class, so it's ok to have multiple instances of it in 
an application.
  */
-private[sql] class SQLAppStatusStore(
+class SQLAppStatusStore(
     store: KVStore,
-    listener: Option[SQLAppStatusListener] = None) {
+    val listener: Option[SQLAppStatusListener] = None) {
 
   def executionsList(): Seq[SQLExecutionUIData] = {
     store.view(classOf[SQLExecutionUIData]).asScala.toSeq
@@ -74,48 +70,9 @@ private[sql] class SQLAppStatusStore(
   def planGraph(executionId: Long): SparkPlanGraph = {
     store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph()
   }
-
-}
-
-/**
- * An AppStatusPlugin for handling the SQL UI and listeners.
- */
-private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
-
-  override def setupListeners(
-      conf: SparkConf,
-      store: ElementTrackingStore,
-      addListenerFn: SparkListener => Unit,
-      live: Boolean): Unit = {
-    // For live applications, the listener is installed in [[setupUI]]. This 
also avoids adding
-    // the listener when the UI is disabled. Force installation during 
testing, though.
-    if (!live || Utils.isTesting) {
-      val listener = new SQLAppStatusListener(conf, store, live, None)
-      addListenerFn(listener)
-    }
-  }
-
-  override def setupUI(ui: SparkUI): Unit = {
-    ui.sc match {
-      case Some(sc) =>
-        // If this is a live application, then install a listener that will 
enable the SQL
-        // tab as soon as there's a SQL event posted to the bus.
-        val listener = new SQLAppStatusListener(sc.conf,
-          ui.store.store.asInstanceOf[ElementTrackingStore], true, Some(ui))
-        sc.listenerBus.addToStatusQueue(listener)
-
-      case _ =>
-        // For a replayed application, only add the tab if the store already 
contains SQL data.
-        val sqlStore = new SQLAppStatusStore(ui.store.store)
-        if (sqlStore.executionsCount() > 0) {
-          new SQLTab(sqlStore, ui)
-        }
-    }
-  }
-
 }
 
-private[sql] class SQLExecutionUIData(
+class SQLExecutionUIData(
     @KVIndexParam val executionId: Long,
     val description: String,
     val details: String,
@@ -133,10 +90,9 @@ private[sql] class SQLExecutionUIData(
      * from the SQL listener instance.
      */
     @JsonDeserialize(keyAs = classOf[JLong])
-    val metricValues: Map[Long, String]
-    )
+    val metricValues: Map[Long, String])
 
-private[sql] class SparkPlanGraphWrapper(
+class SparkPlanGraphWrapper(
     @KVIndexParam val executionId: Long,
     val nodes: Seq[SparkPlanGraphNodeWrapper],
     val edges: Seq[SparkPlanGraphEdge]) {
@@ -147,7 +103,7 @@ private[sql] class SparkPlanGraphWrapper(
 
 }
 
-private[sql] class SparkPlanGraphClusterWrapper(
+class SparkPlanGraphClusterWrapper(
     val id: Long,
     val name: String,
     val desc: String,
@@ -163,7 +119,7 @@ private[sql] class SparkPlanGraphClusterWrapper(
 }
 
 /** Only one of the values should be set. */
-private[sql] class SparkPlanGraphNodeWrapper(
+class SparkPlanGraphNodeWrapper(
     val node: SparkPlanGraphNode,
     val cluster: SparkPlanGraphClusterWrapper) {
 
@@ -174,7 +130,7 @@ private[sql] class SparkPlanGraphNodeWrapper(
 
 }
 
-private[sql] case class SQLPlanMetric(
+case class SQLPlanMetric(
     name: String,
     accumulatorId: Long,
     metricType: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
new file mode 100644
index 0000000..522d0cf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+import org.apache.spark.ui.SparkUI
+
+class SQLHistoryServerPlugin extends AppHistoryServerPlugin {
+  override def createListeners(conf: SparkConf, store: ElementTrackingStore): 
Seq[SparkListener] = {
+    Seq(new SQLAppStatusListener(conf, store, live = false))
+  }
+
+  override def setupUI(ui: SparkUI): Unit = {
+    val sqlStatusStore = new SQLAppStatusStore(ui.store.store)
+    if (sqlStatusStore.executionsCount() > 0) {
+      new SQLTab(sqlStatusStore, ui)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 3e479fa..baea4ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -28,11 +28,12 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
+import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, 
SQLAppStatusStore, SQLTab}
 import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 
@@ -83,6 +84,19 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
   val cacheManager: CacheManager = new CacheManager
 
   /**
+   * A status store to query SQL status/metrics of this Spark application, 
based on SQL-specific
+   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
+   */
+  val statusStore: SQLAppStatusStore = {
+    val kvStore = 
sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
+    val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = 
true)
+    sparkContext.listenerBus.addToStatusQueue(listener)
+    val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
+    sparkContext.ui.foreach(new SQLTab(statusStore, _))
+    statusStore
+  }
+
+  /**
    * A catalog that interacts with external systems.
    */
   lazy val externalCatalog: ExternalCatalog = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d588af3..fc34833 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -33,14 +33,6 @@ import org.apache.spark.util.{AccumulatorContext, 
JsonProtocol}
 class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with 
SharedSQLContext {
   import testImplicits._
 
-  private def statusStore: SQLAppStatusStore = {
-    new SQLAppStatusStore(sparkContext.statusStore.store)
-  }
-
-  private def currentExecutionIds(): Set[Long] = {
-    statusStore.executionsList.map(_.executionId).toSet
-  }
-
   /**
    * Generates a `DataFrame` by filling randomly generated bytes for hash 
collision.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index d89c4b1..122d287 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -31,17 +31,14 @@ import org.apache.spark.util.Utils
 
 
 trait SQLMetricsTestUtils extends SQLTestUtils {
-
   import testImplicits._
 
-  private def statusStore: SQLAppStatusStore = {
-    new SQLAppStatusStore(sparkContext.statusStore.store)
-  }
-
-  private def currentExecutionIds(): Set[Long] = {
+  protected def currentExecutionIds(): Set[Long] = {
     statusStore.executionsList.map(_.executionId).toSet
   }
 
+  protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore
+
   /**
    * Get execution metrics for the SQL execution and verify metrics values.
    *
@@ -57,7 +54,6 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
     assert(executionIds.size == 1)
     val executionId = executionIds.head
 
-    val executionData = statusStore.execution(executionId).get
     val executedNode = statusStore.planGraph(executionId).nodes.head
 
     val metricsNames = Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
new file mode 100644
index 0000000..5ebbeb4
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.Properties
+
+import scala.collection.mutable.ListBuffer
+
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark._
+import org.apache.spark.LocalSparkContext._
+import org.apache.spark.internal.config
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.status.config._
+import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
+import org.apache.spark.util.kvstore.InMemoryStore
+
+
+class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext 
with JsonTestUtils {
+  import testImplicits._
+
+  override protected def sparkConf = {
+    super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 
0L).set(ASYNC_TRACKING_ENABLED, false)
+  }
+
+  private def createTestDataFrame: DataFrame = {
+    Seq(
+      (1, 1),
+      (2, 2)
+    ).toDF().filter("_1 > 1")
+  }
+
+  private def createProperties(executionId: Long): Properties = {
+    val properties = new Properties()
+    properties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionId.toString)
+    properties
+  }
+
+  private def createStageInfo(stageId: Int, attemptId: Int): StageInfo = {
+    new StageInfo(stageId = stageId,
+      attemptId = attemptId,
+      // The following fields are not used in tests
+      name = "",
+      numTasks = 0,
+      rddInfos = Nil,
+      parentIds = Nil,
+      details = "")
+  }
+
+  private def createTaskInfo(
+      taskId: Int,
+      attemptNumber: Int,
+      accums: Map[Long, Long] = Map.empty): TaskInfo = {
+    val info = new TaskInfo(
+      taskId = taskId,
+      attemptNumber = attemptNumber,
+      // The following fields are not used in tests
+      index = 0,
+      launchTime = 0,
+      executorId = "",
+      host = "",
+      taskLocality = null,
+      speculative = false)
+    info.markFinished(TaskState.FINISHED, 1L)
+    info.setAccumulables(createAccumulatorInfos(accums))
+    info
+  }
+
+  private def createAccumulatorInfos(accumulatorUpdates: Map[Long, Long]): 
Seq[AccumulableInfo] = {
+    accumulatorUpdates.map { case (id, value) =>
+      val acc = new LongAccumulator
+      acc.metadata = AccumulatorMetadata(id, None, false)
+      acc.toInfo(Some(value), None)
+    }.toSeq
+  }
+
+  private def assertJobs(
+      exec: Option[SQLExecutionUIData],
+      running: Seq[Int] = Nil,
+      completed: Seq[Int] = Nil,
+      failed: Seq[Int] = Nil): Unit = {
+    val actualRunning = new ListBuffer[Int]()
+    val actualCompleted = new ListBuffer[Int]()
+    val actualFailed = new ListBuffer[Int]()
+
+    exec.get.jobs.foreach { case (jobId, jobStatus) =>
+      jobStatus match {
+        case JobExecutionStatus.RUNNING => actualRunning += jobId
+        case JobExecutionStatus.SUCCEEDED => actualCompleted += jobId
+        case JobExecutionStatus.FAILED => actualFailed += jobId
+        case _ => fail(s"Unexpected status $jobStatus")
+      }
+    }
+
+    assert(actualRunning.sorted === running)
+    assert(actualCompleted.sorted === completed)
+    assert(actualFailed.sorted === failed)
+  }
+
+  private def createStatusStore(): SQLAppStatusStore = {
+    val conf = sparkContext.conf
+    val store = new ElementTrackingStore(new InMemoryStore, conf)
+    val listener = new SQLAppStatusListener(conf, store, live = true)
+    new SQLAppStatusStore(store, Some(listener))
+  }
+
+  test("basic") {
+    def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): 
Unit = {
+      assert(actual.size == expected.size)
+      expected.foreach { case (id, value) =>
+        // The values in actual can be SQL metrics meaning that they contain 
additional formatting
+        // when converted to string. Verify that they start with the expected 
value.
+        // TODO: this is brittle. There is no requirement that the actual 
string needs to start
+        // with the accumulator value.
+        assert(actual.contains(id))
+        val v = actual.get(id).get.trim
+        assert(v.startsWith(value.toString), s"Wrong value for accumulator 
$id")
+      }
+    }
+
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    val accumulatorIds =
+      
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
+        .allNodes.flatMap(_.metrics.map(_.accumulatorId))
+    // Assume all accumulators are long
+    var accumulatorValue = 0L
+    val accumulatorUpdates = accumulatorIds.map { id =>
+      accumulatorValue += 1L
+      (id, accumulatorValue)
+    }.toMap
+
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Seq(
+        createStageInfo(0, 0),
+        createStageInfo(1, 0)
+      ),
+      createProperties(executionId)))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 
0)))
+
+    assert(statusStore.executionMetrics(executionId).isEmpty)
+
+    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", 
Seq(
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
+    )))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
+
+    // Driver accumulator updates don't belong to this execution should be 
filtered and no
+    // exception will be thrown.
+    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
+
+    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", 
Seq(
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
+    )))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 3))
+
+    // Retrying a stage should reset the metrics
+    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 
1)))
+
+    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", 
Seq(
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
+    )))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
+
+    // Ignore the task end for the first attempt
+    listener.onTaskEnd(SparkListenerTaskEnd(
+      stageId = 0,
+      stageAttemptId = 0,
+      taskType = "",
+      reason = null,
+      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+      null))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
+
+    // Finish two tasks
+    listener.onTaskEnd(SparkListenerTaskEnd(
+      stageId = 0,
+      stageAttemptId = 1,
+      taskType = "",
+      reason = null,
+      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
+      null))
+    listener.onTaskEnd(SparkListenerTaskEnd(
+      stageId = 0,
+      stageAttemptId = 1,
+      taskType = "",
+      reason = null,
+      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+      null))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 5))
+
+    // Summit a new stage
+    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 
0)))
+
+    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", 
Seq(
+      // (task id, stage id, stage attempt, accum updates)
+      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
+      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
+    )))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 7))
+
+    // Finish two tasks
+    listener.onTaskEnd(SparkListenerTaskEnd(
+      stageId = 1,
+      stageAttemptId = 0,
+      taskType = "",
+      reason = null,
+      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+      null))
+    listener.onTaskEnd(SparkListenerTaskEnd(
+      stageId = 1,
+      stageAttemptId = 0,
+      taskType = "",
+      reason = null,
+      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+      null))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 11))
+
+    assertJobs(statusStore.execution(executionId), running = Seq(0))
+
+    listener.onJobEnd(SparkListenerJobEnd(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      JobSucceeded
+    ))
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+
+    assertJobs(statusStore.execution(executionId), completed = Seq(0))
+
+    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 11))
+  }
+
+  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Nil,
+      createProperties(executionId)))
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+    listener.onJobEnd(SparkListenerJobEnd(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      JobSucceeded
+    ))
+
+    assertJobs(statusStore.execution(executionId), completed = Seq(0))
+  }
+
+  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Nil,
+      createProperties(executionId)))
+    listener.onJobEnd(SparkListenerJobEnd(
+        jobId = 0,
+        time = System.currentTimeMillis(),
+        JobSucceeded
+    ))
+
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 1,
+      time = System.currentTimeMillis(),
+      stageInfos = Nil,
+      createProperties(executionId)))
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+    listener.onJobEnd(SparkListenerJobEnd(
+      jobId = 1,
+      time = System.currentTimeMillis(),
+      JobSucceeded
+    ))
+
+    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
+  }
+
+  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
+    val statusStore = createStatusStore()
+    val listener = statusStore.listener.get
+
+    val executionId = 0
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onJobStart(SparkListenerJobStart(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      stageInfos = Seq.empty,
+      createProperties(executionId)))
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
+    listener.onJobEnd(SparkListenerJobEnd(
+      jobId = 0,
+      time = System.currentTimeMillis(),
+      JobFailed(new RuntimeException("Oops"))
+    ))
+
+    assertJobs(statusStore.execution(executionId), failed = Seq(0))
+  }
+
+  test("SPARK-11126: no memory leak when running non SQL jobs") {
+    val listener = spark.sharedState.statusStore.listener.get
+    // At the beginning of this test case, there should be no live data in the 
listener.
+    assert(listener.noLiveData())
+    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+    // Listener should ignore the non-SQL stages, as the stage data are only 
removed when SQL
+    // execution ends, which will not be triggered for non-SQL jobs.
+    assert(listener.noLiveData())
+  }
+
+  test("driver side SQL metrics") {
+    val statusStore = spark.sharedState.statusStore
+    val oldCount = statusStore.executionsList().size
+
+    val expectedAccumValue = 12345
+    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
+    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
+      override lazy val sparkPlan = physicalPlan
+      override lazy val executedPlan = physicalPlan
+    }
+
+    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
+      physicalPlan.execute().collect()
+    }
+
+    // Wait until the new execution is started and being tracked.
+    while (statusStore.executionsCount() < oldCount) {
+      Thread.sleep(100)
+    }
+
+    // Wait for listener to finish computing the metrics for the execution.
+    while (statusStore.executionsList().last.metricValues == null) {
+      Thread.sleep(100)
+    }
+
+    val execId = statusStore.executionsList().last.executionId
+    val metrics = statusStore.executionMetrics(execId)
+    val driverMetric = physicalPlan.metrics("dummy")
+    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, 
Seq(expectedAccumValue))
+
+    assert(metrics.contains(driverMetric.id))
+    assert(metrics(driverMetric.id) === expectedValue)
+  }
+
+  test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol 
(SPARK-18462)") {
+    val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
+    val json = JsonProtocol.sparkEventToJson(event)
+    assertValidDataInJson(json,
+      parse("""
+        |{
+        |  "Event": 
"org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+        |  "executionId": 1,
+        |  "accumUpdates": [[2,3]]
+        |}
+      """.stripMargin))
+    JsonProtocol.sparkEventFromJson(json) match {
+      case SparkListenerDriverAccumUpdates(executionId, accums) =>
+        assert(executionId == 1L)
+        accums.foreach { case (a, b) =>
+          assert(a == 2L)
+          assert(b == 3L)
+        }
+    }
+
+    // Test a case where the numbers in the JSON can only fit in longs:
+    val longJson = parse(
+      """
+        |{
+        |  "Event": 
"org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
+        |  "executionId": 4294967294,
+        |  "accumUpdates": [[4294967294,3]]
+        |}
+      """.stripMargin)
+    JsonProtocol.sparkEventFromJson(longJson) match {
+      case SparkListenerDriverAccumUpdates(executionId, accums) =>
+        assert(executionId == 4294967294L)
+        accums.foreach { case (a, b) =>
+          assert(a == 4294967294L)
+          assert(b == 3L)
+        }
+    }
+  }
+
+}
+
+
+/**
+ * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a 
[[SQLMetrics]]
+ * on the driver.
+ */
+private case class MyPlan(sc: SparkContext, expectedValue: Long) extends 
LeafExecNode {
+  override def sparkContext: SparkContext = sc
+  override def output: Seq[Attribute] = Seq()
+
+  override val metrics: Map[String, SQLMetric] = Map(
+    "dummy" -> SQLMetrics.createMetric(sc, "dummy"))
+
+  override def doExecute(): RDD[InternalRow] = {
+    longMetric("dummy") += expectedValue
+
+    SQLMetrics.postDriverMetricUpdates(
+      sc,
+      sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
+      metrics.values.toSeq)
+    sc.emptyRDD
+  }
+}
+
+
+class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
+
+  test("no memory leak") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
+      .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this 
test quickly
+      .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
+      .set(ASYNC_TRACKING_ENABLED, false)
+    withSpark(new SparkContext(conf)) { sc =>
+      quietly {
+        val spark = new SparkSession(sc)
+        import spark.implicits._
+        // Run 100 successful executions and 100 failed executions.
+        // Each execution only has one job and one stage.
+        for (i <- 0 until 100) {
+          val df = Seq(
+            (1, 1),
+            (2, 2)
+          ).toDF()
+          df.collect()
+          try {
+            df.foreach(_ => throw new RuntimeException("Oops"))
+          } catch {
+            case e: SparkException => // This is expected for a failed job
+          }
+        }
+        sc.listenerBus.waitUntilEmpty(10000)
+        val statusStore = spark.sharedState.statusStore
+        assert(statusStore.executionsCount() <= 50)
+        // No live data should be left behind after all executions end.
+        assert(statusStore.listener.get.noLiveData())
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
deleted file mode 100644
index 9329506..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.ui
-
-import java.util.Properties
-
-import scala.collection.mutable.ListBuffer
-
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark._
-import org.apache.spark.LocalSparkContext._
-import org.apache.spark.internal.config
-import org.apache.spark.rdd.RDD
-import org.apache.spark.scheduler._
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, 
SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.status.ElementTrackingStore
-import org.apache.spark.status.config._
-import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, 
LongAccumulator}
-import org.apache.spark.util.kvstore.InMemoryStore
-
-class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with 
JsonTestUtils {
-  import testImplicits._
-
-  override protected def sparkConf = {
-    super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 
0L).set(ASYNC_TRACKING_ENABLED, false)
-  }
-
-  private def createTestDataFrame: DataFrame = {
-    Seq(
-      (1, 1),
-      (2, 2)
-    ).toDF().filter("_1 > 1")
-  }
-
-  private def createProperties(executionId: Long): Properties = {
-    val properties = new Properties()
-    properties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionId.toString)
-    properties
-  }
-
-  private def createStageInfo(stageId: Int, attemptId: Int): StageInfo = new 
StageInfo(
-    stageId = stageId,
-    attemptId = attemptId,
-    // The following fields are not used in tests
-    name = "",
-    numTasks = 0,
-    rddInfos = Nil,
-    parentIds = Nil,
-    details = ""
-  )
-
-  private def createTaskInfo(
-      taskId: Int,
-      attemptNumber: Int,
-      accums: Map[Long, Long] = Map()): TaskInfo = {
-    val info = new TaskInfo(
-      taskId = taskId,
-      attemptNumber = attemptNumber,
-      // The following fields are not used in tests
-      index = 0,
-      launchTime = 0,
-      executorId = "",
-      host = "",
-      taskLocality = null,
-      speculative = false)
-    info.markFinished(TaskState.FINISHED, 1L)
-    info.setAccumulables(createAccumulatorInfos(accums))
-    info
-  }
-
-  private def createAccumulatorInfos(accumulatorUpdates: Map[Long, Long]): 
Seq[AccumulableInfo] = {
-    accumulatorUpdates.map { case (id, value) =>
-      val acc = new LongAccumulator
-      acc.metadata = AccumulatorMetadata(id, None, false)
-      acc.toInfo(Some(value), None)
-    }.toSeq
-  }
-
-  /** Return the shared SQL store from the active SparkSession. */
-  private def statusStore: SQLAppStatusStore =
-    new SQLAppStatusStore(spark.sparkContext.statusStore.store)
-
-  /**
-   * Runs a test with a temporary SQLAppStatusStore tied to a listener bus. 
Events can be sent to
-   * the listener bus to update the store, and all data will be cleaned up at 
the end of the test.
-   */
-  private def sqlStoreTest(name: String)
-      (fn: (SQLAppStatusStore, SparkListenerBus) => Unit): Unit = {
-    test(name) {
-      val conf = sparkConf
-      val store = new ElementTrackingStore(new InMemoryStore(), conf)
-      val bus = new ReplayListenerBus()
-      val listener = new SQLAppStatusListener(conf, store, true)
-      bus.addListener(listener)
-      store.close(false)
-      val sqlStore = new SQLAppStatusStore(store, Some(listener))
-      fn(sqlStore, bus)
-    }
-  }
-
-  sqlStoreTest("basic") { (store, bus) =>
-    def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): 
Unit = {
-      assert(actual.size == expected.size)
-      expected.foreach { case (id, value) =>
-        // The values in actual can be SQL metrics meaning that they contain 
additional formatting
-        // when converted to string. Verify that they start with the expected 
value.
-        // TODO: this is brittle. There is no requirement that the actual 
string needs to start
-        // with the accumulator value.
-        assert(actual.contains(id))
-        val v = actual.get(id).get.trim
-        assert(v.startsWith(value.toString), s"Wrong value for accumulator 
$id")
-      }
-    }
-
-    val executionId = 0
-    val df = createTestDataFrame
-    val accumulatorIds =
-      
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
-        .allNodes.flatMap(_.metrics.map(_.accumulatorId))
-    // Assume all accumulators are long
-    var accumulatorValue = 0L
-    val accumulatorUpdates = accumulatorIds.map { id =>
-      accumulatorValue += 1L
-      (id, accumulatorValue)
-    }.toMap
-
-    bus.postToAll(SparkListenerSQLExecutionStart(
-      executionId,
-      "test",
-      "test",
-      df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
-
-    bus.postToAll(SparkListenerJobStart(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      stageInfos = Seq(
-        createStageInfo(0, 0),
-        createStageInfo(1, 0)
-      ),
-      createProperties(executionId)))
-    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
-
-    assert(store.executionMetrics(0).isEmpty)
-
-    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, accum updates)
-      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
-      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
-    )))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
-
-    // Driver accumulator updates don't belong to this execution should be 
filtered and no
-    // exception will be thrown.
-    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
-
-    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, accum updates)
-      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
-      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
-    )))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
-
-    // Retrying a stage should reset the metrics
-    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
-
-    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, accum updates)
-      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
-      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
-    )))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
-
-    // Ignore the task end for the first attempt
-    bus.postToAll(SparkListenerTaskEnd(
-      stageId = 0,
-      stageAttemptId = 0,
-      taskType = "",
-      reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
-      null))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
-
-    // Finish two tasks
-    bus.postToAll(SparkListenerTaskEnd(
-      stageId = 0,
-      stageAttemptId = 1,
-      taskType = "",
-      reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
-      null))
-    bus.postToAll(SparkListenerTaskEnd(
-      stageId = 0,
-      stageAttemptId = 1,
-      taskType = "",
-      reason = null,
-      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
-      null))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
-
-    // Summit a new stage
-    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
-
-    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
-      // (task id, stage id, stage attempt, accum updates)
-      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
-      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
-    )))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
-
-    // Finish two tasks
-    bus.postToAll(SparkListenerTaskEnd(
-      stageId = 1,
-      stageAttemptId = 0,
-      taskType = "",
-      reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
-      null))
-    bus.postToAll(SparkListenerTaskEnd(
-      stageId = 1,
-      stageAttemptId = 0,
-      taskType = "",
-      reason = null,
-      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
-      null))
-
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 
11))
-
-    assertJobs(store.execution(0), running = Seq(0))
-
-    bus.postToAll(SparkListenerJobEnd(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      JobSucceeded
-    ))
-    bus.postToAll(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
-
-    assertJobs(store.execution(0), completed = Seq(0))
-    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 
11))
-  }
-
-  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { 
(store, bus) =>
-    val executionId = 0
-    val df = createTestDataFrame
-    bus.postToAll(SparkListenerSQLExecutionStart(
-      executionId,
-      "test",
-      "test",
-      df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
-    bus.postToAll(SparkListenerJobStart(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      stageInfos = Nil,
-      createProperties(executionId)))
-    bus.postToAll(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
-    bus.postToAll(SparkListenerJobEnd(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      JobSucceeded
-    ))
-
-    assertJobs(store.execution(0), completed = Seq(0))
-  }
-
-  sqlStoreTest("onExecutionEnd happens before multiple 
onJobEnd(JobSucceeded)s") { (store, bus) =>
-    val executionId = 0
-    val df = createTestDataFrame
-    bus.postToAll(SparkListenerSQLExecutionStart(
-      executionId,
-      "test",
-      "test",
-      df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
-    bus.postToAll(SparkListenerJobStart(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      stageInfos = Nil,
-      createProperties(executionId)))
-    bus.postToAll(SparkListenerJobEnd(
-        jobId = 0,
-        time = System.currentTimeMillis(),
-        JobSucceeded
-    ))
-
-    bus.postToAll(SparkListenerJobStart(
-      jobId = 1,
-      time = System.currentTimeMillis(),
-      stageInfos = Nil,
-      createProperties(executionId)))
-    bus.postToAll(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
-    bus.postToAll(SparkListenerJobEnd(
-      jobId = 1,
-      time = System.currentTimeMillis(),
-      JobSucceeded
-    ))
-
-    assertJobs(store.execution(0), completed = Seq(0, 1))
-  }
-
-  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, 
bus) =>
-    val executionId = 0
-    val df = createTestDataFrame
-    bus.postToAll(SparkListenerSQLExecutionStart(
-      executionId,
-      "test",
-      "test",
-      df.queryExecution.toString,
-      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
-      System.currentTimeMillis()))
-    bus.postToAll(SparkListenerJobStart(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      stageInfos = Seq.empty,
-      createProperties(executionId)))
-    bus.postToAll(SparkListenerSQLExecutionEnd(
-      executionId, System.currentTimeMillis()))
-    bus.postToAll(SparkListenerJobEnd(
-      jobId = 0,
-      time = System.currentTimeMillis(),
-      JobFailed(new RuntimeException("Oops"))
-    ))
-
-    assertJobs(store.execution(0), failed = Seq(0))
-  }
-
-  test("SPARK-11126: no memory leak when running non SQL jobs") {
-    val previousStageNumber = statusStore.executionsList().size
-    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
-    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-    // listener should ignore the non SQL stage
-    assert(statusStore.executionsList().size == previousStageNumber)
-
-    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
-    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-    // listener should save the SQL stage
-    assert(statusStore.executionsList().size == previousStageNumber + 1)
-  }
-
-  test("driver side SQL metrics") {
-    val oldCount = statusStore.executionsList().size
-    val expectedAccumValue = 12345L
-    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
-    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
-      override lazy val sparkPlan = physicalPlan
-      override lazy val executedPlan = physicalPlan
-    }
-
-    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
-      physicalPlan.execute().collect()
-    }
-
-    while (statusStore.executionsList().size < oldCount) {
-      Thread.sleep(100)
-    }
-
-    // Wait for listener to finish computing the metrics for the execution.
-    while (statusStore.executionsList().last.metricValues == null) {
-      Thread.sleep(100)
-    }
-
-    val execId = statusStore.executionsList().last.executionId
-    val metrics = statusStore.executionMetrics(execId)
-    val driverMetric = physicalPlan.metrics("dummy")
-    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, 
Seq(expectedAccumValue))
-
-    assert(metrics.contains(driverMetric.id))
-    assert(metrics(driverMetric.id) === expectedValue)
-  }
-
-  private def assertJobs(
-      exec: Option[SQLExecutionUIData],
-      running: Seq[Int] = Nil,
-      completed: Seq[Int] = Nil,
-      failed: Seq[Int] = Nil): Unit = {
-
-    val actualRunning = new ListBuffer[Int]()
-    val actualCompleted = new ListBuffer[Int]()
-    val actualFailed = new ListBuffer[Int]()
-
-    exec.get.jobs.foreach { case (jobId, jobStatus) =>
-      jobStatus match {
-        case JobExecutionStatus.RUNNING => actualRunning += jobId
-        case JobExecutionStatus.SUCCEEDED => actualCompleted += jobId
-        case JobExecutionStatus.FAILED => actualFailed += jobId
-        case _ => fail(s"Unexpected status $jobStatus")
-      }
-    }
-
-    assert(actualRunning.toSeq.sorted === running)
-    assert(actualCompleted.toSeq.sorted === completed)
-    assert(actualFailed.toSeq.sorted === failed)
-  }
-
-  test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol 
(SPARK-18462)") {
-    val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
-    val json = JsonProtocol.sparkEventToJson(event)
-    assertValidDataInJson(json,
-      parse("""
-        |{
-        |  "Event": 
"org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
-        |  "executionId": 1,
-        |  "accumUpdates": [[2,3]]
-        |}
-      """.stripMargin))
-    JsonProtocol.sparkEventFromJson(json) match {
-      case SparkListenerDriverAccumUpdates(executionId, accums) =>
-        assert(executionId == 1L)
-        accums.foreach { case (a, b) =>
-          assert(a == 2L)
-          assert(b == 3L)
-        }
-    }
-
-    // Test a case where the numbers in the JSON can only fit in longs:
-    val longJson = parse(
-      """
-        |{
-        |  "Event": 
"org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
-        |  "executionId": 4294967294,
-        |  "accumUpdates": [[4294967294,3]]
-        |}
-      """.stripMargin)
-    JsonProtocol.sparkEventFromJson(longJson) match {
-      case SparkListenerDriverAccumUpdates(executionId, accums) =>
-        assert(executionId == 4294967294L)
-        accums.foreach { case (a, b) =>
-          assert(a == 4294967294L)
-          assert(b == 3L)
-        }
-    }
-  }
-
-}
-
-
-/**
- * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a 
[[SQLMetrics]]
- * on the driver.
- */
-private case class MyPlan(sc: SparkContext, expectedValue: Long) extends 
LeafExecNode {
-  override def sparkContext: SparkContext = sc
-  override def output: Seq[Attribute] = Seq()
-
-  override val metrics: Map[String, SQLMetric] = Map(
-    "dummy" -> SQLMetrics.createMetric(sc, "dummy"))
-
-  override def doExecute(): RDD[InternalRow] = {
-    longMetric("dummy") += expectedValue
-
-    SQLMetrics.postDriverMetricUpdates(
-      sc,
-      sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY),
-      metrics.values.toSeq)
-    sc.emptyRDD
-  }
-}
-
-
-class SQLListenerMemoryLeakSuite extends SparkFunSuite {
-
-  test("no memory leak") {
-    val conf = new SparkConf()
-      .setMaster("local")
-      .setAppName("test")
-      .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this 
test quickly
-      .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
-      .set(ASYNC_TRACKING_ENABLED, false)
-    withSpark(new SparkContext(conf)) { sc =>
-      quietly {
-        val spark = new SparkSession(sc)
-        import spark.implicits._
-        // Run 100 successful executions and 100 failed executions.
-        // Each execution only has one job and one stage.
-        for (i <- 0 until 100) {
-          val df = Seq(
-            (1, 1),
-            (2, 2)
-          ).toDF()
-          df.collect()
-          try {
-            df.foreach(_ => throw new RuntimeException("Oops"))
-          } catch {
-            case e: SparkException => // This is expected for a failed job
-          }
-        }
-        sc.listenerBus.waitUntilEmpty(10000)
-
-        val statusStore = new SQLAppStatusStore(sc.statusStore.store)
-        assert(statusStore.executionsList().size <= 50)
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to