Repository: spark Updated Branches: refs/heads/master 8a0ed5a5e -> d3a1d9527
[SPARK-22786][SQL] only use AppStatusPlugin in history server ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/19681 we introduced a new interface called `AppStatusPlugin`, to register listeners and set up the UI for both live and history UI. However I think it's an overkill for live UI. For example, we should not register `SQLListener` if users are not using SQL functions. Previously we register the `SQLListener` and set up SQL tab when `SparkSession` is firstly created, which indicates users are going to use SQL functions. But in #19681 , we register the SQL functions during `SparkContext` creation. The same thing should apply to streaming too. I think we should keep the previous behavior, and only use this new interface for history server. To reflect this change, I also rename the new interface to `SparkHistoryUIPlugin` This PR also refines the tests for sql listener. ## How was this patch tested? existing tests Author: Wenchen Fan <wenc...@databricks.com> Closes #19981 from cloud-fan/listener. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3a1d952 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3a1d952 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3a1d952 Branch: refs/heads/master Commit: d3a1d9527bcd6675cc45773f01d4558cf4b46b3d Parents: 8a0ed5a Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Dec 22 01:08:13 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Dec 22 01:08:13 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 16 +- .../deploy/history/FsHistoryProvider.scala | 14 +- .../spark/status/AppHistoryServerPlugin.scala | 38 ++ .../apache/spark/status/AppStatusListener.scala | 2 +- .../apache/spark/status/AppStatusPlugin.scala | 71 --- .../apache/spark/status/AppStatusStore.scala | 17 +- .../org/apache/spark/ui/StagePageSuite.scala | 20 +- ...g.apache.spark.status.AppHistoryServerPlugin | 1 + .../org.apache.spark.status.AppStatusPlugin | 1 - .../sql/execution/ui/SQLAppStatusListener.scala | 23 +- .../sql/execution/ui/SQLAppStatusStore.scala | 62 +-- .../execution/ui/SQLHistoryServerPlugin.scala | 36 ++ .../apache/spark/sql/internal/SharedState.scala | 18 +- .../sql/execution/metric/SQLMetricsSuite.scala | 8 - .../execution/metric/SQLMetricsTestUtils.scala | 10 +- .../ui/SQLAppStatusListenerSuite.scala | 531 +++++++++++++++++++ .../sql/execution/ui/SQLListenerSuite.scala | 531 ------------------- 17 files changed, 668 insertions(+), 731 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 92e13ce..fcbeddd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.apache.spark.status.{AppStatusPlugin, AppStatusStore} +import org.apache.spark.status.AppStatusStore import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -416,7 +416,8 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. - _statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l)) + _statusStore = AppStatusStore.createLiveStore(conf) + listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) @@ -445,14 +446,9 @@ class SparkContext(config: SparkConf) extends Logging { // For tests, do not enable the UI None } - _ui.foreach { ui => - // Load any plugins that might want to modify the UI. - AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui)) - - // Bind the UI before starting the task scheduler to communicate - // the bound port to the cluster manager properly - ui.bind() - } + // Bind the UI before starting the task scheduler to communicate + // the bound port to the cluster manager properly + _ui.foreach(_.bind()) _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index fa2c519..a299b79 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -44,7 +44,6 @@ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} -import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) (new InMemoryStore(), true) } + val plugins = ServiceLoader.load( + classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala val trackingStore = new ElementTrackingStore(kvstore, conf) if (needReplay) { val replayBus = new ReplayListenerBus() val listener = new AppStatusListener(trackingStore, conf, false, lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) replayBus.addListener(listener) - AppStatusPlugin.loadPlugins().foreach { plugin => - plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false) - } + for { + plugin <- plugins + listener <- plugin.createListeners(conf, trackingStore) + } replayBus.addListener(listener) try { val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) @@ -353,9 +355,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) HistoryServer.getAttemptURI(appId, attempt.info.attemptId), attempt.info.startTime.getTime(), attempt.info.appSparkVersion) - AppStatusPlugin.loadPlugins().foreach { plugin => - plugin.setupUI(ui) - } + plugins.foreach(_.setupUI(ui)) val loadedUI = LoadedAppUI(ui) http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala new file mode 100644 index 0000000..d144a0e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.ui.SparkUI + +/** + * An interface for creating history listeners(to replay event logs) defined in other modules like + * SQL, and setup the UI of the plugin to rebuild the history UI. + */ +private[spark] trait AppHistoryServerPlugin { + /** + * Creates listeners to replay the event logs. + */ + def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] + + /** + * Sets up UI of this plugin to rebuild the history UI. + */ + def setupUI(ui: SparkUI): Unit +} http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 87eb84d..4db797e 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -48,7 +48,7 @@ private[spark] class AppStatusListener( import config._ - private var sparkVersion = SPARK_VERSION + private val sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null private var appSummary = new AppSummary(0, 0) private var coresPerTask: Int = 1 http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala deleted file mode 100644 index 4cada5c..0000000 --- a/core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status - -import java.util.ServiceLoader - -import scala.collection.JavaConverters._ - -import org.apache.spark.SparkConf -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils -import org.apache.spark.util.kvstore.KVStore - -/** - * An interface that defines plugins for collecting and storing application state. - * - * The plugin implementations are invoked for both live and replayed applications. For live - * applications, it's recommended that plugins defer creation of UI tabs until there's actual - * data to be shown. - */ -private[spark] trait AppStatusPlugin { - - /** - * Install listeners to collect data about the running application and populate the given - * store. - * - * @param conf The Spark configuration. - * @param store The KVStore where to keep application data. - * @param addListenerFn Function to register listeners with a bus. - * @param live Whether this is a live application (or an application being replayed by the - * HistoryServer). - */ - def setupListeners( - conf: SparkConf, - store: ElementTrackingStore, - addListenerFn: SparkListener => Unit, - live: Boolean): Unit - - /** - * Install any needed extensions (tabs, pages, etc) to a Spark UI. The plugin can detect whether - * the app is live or replayed by looking at the UI's SparkContext field `sc`. - * - * @param ui The Spark UI instance for the application. - */ - def setupUI(ui: SparkUI): Unit - -} - -private[spark] object AppStatusPlugin { - - def loadPlugins(): Iterable[AppStatusPlugin] = { - ServiceLoader.load(classOf[AppStatusPlugin], Utils.getContextOrSparkClassLoader).asScala - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 9987419..5a942f5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -17,16 +17,14 @@ package org.apache.spark.status -import java.io.File -import java.util.{Arrays, List => JList} +import java.util.{List => JList} import scala.collection.JavaConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf} -import org.apache.spark.scheduler.SparkListener import org.apache.spark.status.api.v1 import org.apache.spark.ui.scope._ -import org.apache.spark.util.{Distribution, Utils} +import org.apache.spark.util.Distribution import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} /** @@ -34,7 +32,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore} */ private[spark] class AppStatusStore( val store: KVStore, - listener: Option[AppStatusListener] = None) { + val listener: Option[AppStatusListener] = None) { def applicationInfo(): v1.ApplicationInfo = { store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info @@ -346,17 +344,10 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. - * - * @param conf Configuration. - * @param addListenerFn Function to register a listener with a bus. */ - def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = { + def createLiveStore(conf: SparkConf): AppStatusStore = { val store = new ElementTrackingStore(new InMemoryStore(), conf) val listener = new AppStatusListener(store, conf, true) - addListenerFn(listener) - AppStatusPlugin.loadPlugins().foreach { p => - p.setupListeners(conf, store, addListenerFn, true) - } new AppStatusStore(store, listener = Some(listener)) } http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 46932a0..661d0d4 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -22,7 +22,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.mockito.Matchers.anyString import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.apache.spark._ @@ -30,7 +29,6 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.status.AppStatusStore import org.apache.spark.ui.jobs.{StagePage, StagesTab} -import org.apache.spark.util.Utils class StagePageSuite extends SparkFunSuite with LocalSparkContext { @@ -55,12 +53,12 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { * This also runs a dummy stage to populate the page with useful content. */ private def renderStagePage(conf: SparkConf): Seq[Node] = { - val bus = new ReplayListenerBus() - val store = AppStatusStore.createLiveStore(conf, l => bus.addListener(l)) + val statusStore = AppStatusStore.createLiveStore(conf) + val listener = statusStore.listener.get try { val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS) - when(tab.store).thenReturn(store) + when(tab.store).thenReturn(statusStore) val request = mock(classOf[HttpServletRequest]) when(tab.conf).thenReturn(conf) @@ -68,7 +66,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { when(tab.headerTabs).thenReturn(Seq.empty) when(request.getParameter("id")).thenReturn("0") when(request.getParameter("attempt")).thenReturn("0") - val page = new StagePage(tab, store) + val page = new StagePage(tab, statusStore) // Simulate a stage in job progress listener val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details") @@ -77,17 +75,17 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { taskId => val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) - bus.postToAll(SparkListenerStageSubmitted(stageInfo)) - bus.postToAll(SparkListenerTaskStart(0, 0, taskInfo)) + listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) + listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis()) val taskMetrics = TaskMetrics.empty taskMetrics.incPeakExecutionMemory(peakExecutionMemory) - bus.postToAll(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics)) + listener.onTaskEnd(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics)) } - bus.postToAll(SparkListenerStageCompleted(stageInfo)) + listener.onStageCompleted(SparkListenerStageCompleted(stageInfo)) page.render(request) } finally { - store.close() + statusStore.close() } } http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin ---------------------------------------------------------------------- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin new file mode 100644 index 0000000..0bba2f8 --- /dev/null +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin @@ -0,0 +1 @@ +org.apache.spark.sql.execution.ui.SQLHistoryServerPlugin http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin ---------------------------------------------------------------------- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin deleted file mode 100644 index ac6d7f6..0000000 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.AppStatusPlugin +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.sql.execution.ui.SQLAppStatusPlugin http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index cf0000c..aa78fa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -30,15 +30,11 @@ import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} import org.apache.spark.status.config._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.kvstore.KVStore -private[sql] class SQLAppStatusListener( +class SQLAppStatusListener( conf: SparkConf, kvstore: ElementTrackingStore, - live: Boolean, - ui: Option[SparkUI] = None) - extends SparkListener with Logging { + live: Boolean) extends SparkListener with Logging { // How often to flush intermediate state of a live execution to the store. When replaying logs, // never flush (only do the very last write). @@ -50,7 +46,10 @@ private[sql] class SQLAppStatusListener( private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]() private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]() - private var uiInitialized = false + // Returns true if this listener has no live data. Exposed for tests only. + private[sql] def noLiveData(): Boolean = { + liveExecutions.isEmpty && stageMetrics.isEmpty + } kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count => cleanupExecutions(count) @@ -230,14 +229,6 @@ private[sql] class SQLAppStatusListener( } private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { - // Install the SQL tab in a live app if it hasn't been initialized yet. - if (!uiInitialized) { - ui.foreach { _ui => - new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui) - } - uiInitialized = true - } - val SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) = event @@ -389,7 +380,7 @@ private class LiveStageMetrics( val accumulatorIds: Array[Long], val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics]) -private[sql] class LiveTaskMetrics( +private class LiveTaskMetrics( val ids: Array[Long], val values: Array[Long], val succeeded: Boolean) http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 7fd5f73..910f2e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -25,21 +25,17 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import org.apache.spark.{JobExecutionStatus, SparkConf} -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.status.{AppStatusPlugin, ElementTrackingStore} +import org.apache.spark.JobExecutionStatus import org.apache.spark.status.KVUtils.KVIndexParam -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.KVStore /** * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's * no state kept in this class, so it's ok to have multiple instances of it in an application. */ -private[sql] class SQLAppStatusStore( +class SQLAppStatusStore( store: KVStore, - listener: Option[SQLAppStatusListener] = None) { + val listener: Option[SQLAppStatusListener] = None) { def executionsList(): Seq[SQLExecutionUIData] = { store.view(classOf[SQLExecutionUIData]).asScala.toSeq @@ -74,48 +70,9 @@ private[sql] class SQLAppStatusStore( def planGraph(executionId: Long): SparkPlanGraph = { store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph() } - -} - -/** - * An AppStatusPlugin for handling the SQL UI and listeners. - */ -private[sql] class SQLAppStatusPlugin extends AppStatusPlugin { - - override def setupListeners( - conf: SparkConf, - store: ElementTrackingStore, - addListenerFn: SparkListener => Unit, - live: Boolean): Unit = { - // For live applications, the listener is installed in [[setupUI]]. This also avoids adding - // the listener when the UI is disabled. Force installation during testing, though. - if (!live || Utils.isTesting) { - val listener = new SQLAppStatusListener(conf, store, live, None) - addListenerFn(listener) - } - } - - override def setupUI(ui: SparkUI): Unit = { - ui.sc match { - case Some(sc) => - // If this is a live application, then install a listener that will enable the SQL - // tab as soon as there's a SQL event posted to the bus. - val listener = new SQLAppStatusListener(sc.conf, - ui.store.store.asInstanceOf[ElementTrackingStore], true, Some(ui)) - sc.listenerBus.addToStatusQueue(listener) - - case _ => - // For a replayed application, only add the tab if the store already contains SQL data. - val sqlStore = new SQLAppStatusStore(ui.store.store) - if (sqlStore.executionsCount() > 0) { - new SQLTab(sqlStore, ui) - } - } - } - } -private[sql] class SQLExecutionUIData( +class SQLExecutionUIData( @KVIndexParam val executionId: Long, val description: String, val details: String, @@ -133,10 +90,9 @@ private[sql] class SQLExecutionUIData( * from the SQL listener instance. */ @JsonDeserialize(keyAs = classOf[JLong]) - val metricValues: Map[Long, String] - ) + val metricValues: Map[Long, String]) -private[sql] class SparkPlanGraphWrapper( +class SparkPlanGraphWrapper( @KVIndexParam val executionId: Long, val nodes: Seq[SparkPlanGraphNodeWrapper], val edges: Seq[SparkPlanGraphEdge]) { @@ -147,7 +103,7 @@ private[sql] class SparkPlanGraphWrapper( } -private[sql] class SparkPlanGraphClusterWrapper( +class SparkPlanGraphClusterWrapper( val id: Long, val name: String, val desc: String, @@ -163,7 +119,7 @@ private[sql] class SparkPlanGraphClusterWrapper( } /** Only one of the values should be set. */ -private[sql] class SparkPlanGraphNodeWrapper( +class SparkPlanGraphNodeWrapper( val node: SparkPlanGraphNode, val cluster: SparkPlanGraphClusterWrapper) { @@ -174,7 +130,7 @@ private[sql] class SparkPlanGraphNodeWrapper( } -private[sql] case class SQLPlanMetric( +case class SQLPlanMetric( name: String, accumulatorId: Long, metricType: String) http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala new file mode 100644 index 0000000..522d0cf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore} +import org.apache.spark.ui.SparkUI + +class SQLHistoryServerPlugin extends AppHistoryServerPlugin { + override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new SQLAppStatusListener(conf, store, live = false)) + } + + override def setupUI(ui: SparkUI): Unit = { + val sqlStatusStore = new SQLAppStatusStore(ui.store.store) + if (sqlStatusStore.executionsCount() > 0) { + new SQLTab(sqlStatusStore, ui) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 3e479fa..baea4ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -28,11 +28,12 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager +import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{MutableURLClassLoader, Utils} @@ -83,6 +84,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { val cacheManager: CacheManager = new CacheManager /** + * A status store to query SQL status/metrics of this Spark application, based on SQL-specific + * [[org.apache.spark.scheduler.SparkListenerEvent]]s. + */ + val statusStore: SQLAppStatusStore = { + val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] + val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true) + sparkContext.listenerBus.addToStatusQueue(listener) + val statusStore = new SQLAppStatusStore(kvStore, Some(listener)) + sparkContext.ui.foreach(new SQLTab(statusStore, _)) + statusStore + } + + /** * A catalog that interacts with external systems. */ lazy val externalCatalog: ExternalCatalog = { http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d588af3..fc34833 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -33,14 +33,6 @@ import org.apache.spark.util.{AccumulatorContext, JsonProtocol} class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with SharedSQLContext { import testImplicits._ - private def statusStore: SQLAppStatusStore = { - new SQLAppStatusStore(sparkContext.statusStore.store) - } - - private def currentExecutionIds(): Set[Long] = { - statusStore.executionsList.map(_.executionId).toSet - } - /** * Generates a `DataFrame` by filling randomly generated bytes for hash collision. */ http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index d89c4b1..122d287 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -31,17 +31,14 @@ import org.apache.spark.util.Utils trait SQLMetricsTestUtils extends SQLTestUtils { - import testImplicits._ - private def statusStore: SQLAppStatusStore = { - new SQLAppStatusStore(sparkContext.statusStore.store) - } - - private def currentExecutionIds(): Set[Long] = { + protected def currentExecutionIds(): Set[Long] = { statusStore.executionsList.map(_.executionId).toSet } + protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore + /** * Get execution metrics for the SQL execution and verify metrics values. * @@ -57,7 +54,6 @@ trait SQLMetricsTestUtils extends SQLTestUtils { assert(executionIds.size == 1) val executionId = executionIds.head - val executionData = statusStore.execution(executionId).get val executedNode = statusStore.planGraph(executionId).nodes.head val metricsNames = Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala new file mode 100644 index 0000000..5ebbeb4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.util.Properties + +import scala.collection.mutable.ListBuffer + +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark._ +import org.apache.spark.LocalSparkContext._ +import org.apache.spark.internal.config +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler._ +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.status.config._ +import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} +import org.apache.spark.util.kvstore.InMemoryStore + + +class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { + import testImplicits._ + + override protected def sparkConf = { + super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L).set(ASYNC_TRACKING_ENABLED, false) + } + + private def createTestDataFrame: DataFrame = { + Seq( + (1, 1), + (2, 2) + ).toDF().filter("_1 > 1") + } + + private def createProperties(executionId: Long): Properties = { + val properties = new Properties() + properties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionId.toString) + properties + } + + private def createStageInfo(stageId: Int, attemptId: Int): StageInfo = { + new StageInfo(stageId = stageId, + attemptId = attemptId, + // The following fields are not used in tests + name = "", + numTasks = 0, + rddInfos = Nil, + parentIds = Nil, + details = "") + } + + private def createTaskInfo( + taskId: Int, + attemptNumber: Int, + accums: Map[Long, Long] = Map.empty): TaskInfo = { + val info = new TaskInfo( + taskId = taskId, + attemptNumber = attemptNumber, + // The following fields are not used in tests + index = 0, + launchTime = 0, + executorId = "", + host = "", + taskLocality = null, + speculative = false) + info.markFinished(TaskState.FINISHED, 1L) + info.setAccumulables(createAccumulatorInfos(accums)) + info + } + + private def createAccumulatorInfos(accumulatorUpdates: Map[Long, Long]): Seq[AccumulableInfo] = { + accumulatorUpdates.map { case (id, value) => + val acc = new LongAccumulator + acc.metadata = AccumulatorMetadata(id, None, false) + acc.toInfo(Some(value), None) + }.toSeq + } + + private def assertJobs( + exec: Option[SQLExecutionUIData], + running: Seq[Int] = Nil, + completed: Seq[Int] = Nil, + failed: Seq[Int] = Nil): Unit = { + val actualRunning = new ListBuffer[Int]() + val actualCompleted = new ListBuffer[Int]() + val actualFailed = new ListBuffer[Int]() + + exec.get.jobs.foreach { case (jobId, jobStatus) => + jobStatus match { + case JobExecutionStatus.RUNNING => actualRunning += jobId + case JobExecutionStatus.SUCCEEDED => actualCompleted += jobId + case JobExecutionStatus.FAILED => actualFailed += jobId + case _ => fail(s"Unexpected status $jobStatus") + } + } + + assert(actualRunning.sorted === running) + assert(actualCompleted.sorted === completed) + assert(actualFailed.sorted === failed) + } + + private def createStatusStore(): SQLAppStatusStore = { + val conf = sparkContext.conf + val store = new ElementTrackingStore(new InMemoryStore, conf) + val listener = new SQLAppStatusListener(conf, store, live = true) + new SQLAppStatusStore(store, Some(listener)) + } + + test("basic") { + def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = { + assert(actual.size == expected.size) + expected.foreach { case (id, value) => + // The values in actual can be SQL metrics meaning that they contain additional formatting + // when converted to string. Verify that they start with the expected value. + // TODO: this is brittle. There is no requirement that the actual string needs to start + // with the accumulator value. + assert(actual.contains(id)) + val v = actual.get(id).get.trim + assert(v.startsWith(value.toString), s"Wrong value for accumulator $id") + } + } + + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + val accumulatorIds = + SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)) + .allNodes.flatMap(_.metrics.map(_.accumulatorId)) + // Assume all accumulators are long + var accumulatorValue = 0L + val accumulatorUpdates = accumulatorIds.map { id => + accumulatorValue += 1L + (id, accumulatorValue) + }.toMap + + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + + listener.onJobStart(SparkListenerJobStart( + jobId = 0, + time = System.currentTimeMillis(), + stageInfos = Seq( + createStageInfo(0, 0), + createStageInfo(1, 0) + ), + createProperties(executionId))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) + + assert(statusStore.executionMetrics(executionId).isEmpty) + + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + // (task id, stage id, stage attempt, accum updates) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) + ))) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + + // Driver accumulator updates don't belong to this execution should be filtered and no + // exception will be thrown. + listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + // (task id, stage id, stage attempt, accum updates) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) + ))) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3)) + + // Retrying a stage should reset the metrics + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) + + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + // (task id, stage id, stage attempt, accum updates) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) + ))) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + + // Ignore the task end for the first attempt + listener.onTaskEnd(SparkListenerTaskEnd( + stageId = 0, + stageAttemptId = 0, + taskType = "", + reason = null, + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) + + // Finish two tasks + listener.onTaskEnd(SparkListenerTaskEnd( + stageId = 0, + stageAttemptId = 1, + taskType = "", + reason = null, + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), + null)) + listener.onTaskEnd(SparkListenerTaskEnd( + stageId = 0, + stageAttemptId = 1, + taskType = "", + reason = null, + createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + null)) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5)) + + // Summit a new stage + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) + + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + // (task id, stage id, stage attempt, accum updates) + (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates)) + ))) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7)) + + // Finish two tasks + listener.onTaskEnd(SparkListenerTaskEnd( + stageId = 1, + stageAttemptId = 0, + taskType = "", + reason = null, + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + null)) + listener.onTaskEnd(SparkListenerTaskEnd( + stageId = 1, + stageAttemptId = 0, + taskType = "", + reason = null, + createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), + null)) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11)) + + assertJobs(statusStore.execution(executionId), running = Seq(0)) + + listener.onJobEnd(SparkListenerJobEnd( + jobId = 0, + time = System.currentTimeMillis(), + JobSucceeded + )) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + + assertJobs(statusStore.execution(executionId), completed = Seq(0)) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11)) + } + + test("onExecutionEnd happens before onJobEnd(JobSucceeded)") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onJobStart(SparkListenerJobStart( + jobId = 0, + time = System.currentTimeMillis(), + stageInfos = Nil, + createProperties(executionId))) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + listener.onJobEnd(SparkListenerJobEnd( + jobId = 0, + time = System.currentTimeMillis(), + JobSucceeded + )) + + assertJobs(statusStore.execution(executionId), completed = Seq(0)) + } + + test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onJobStart(SparkListenerJobStart( + jobId = 0, + time = System.currentTimeMillis(), + stageInfos = Nil, + createProperties(executionId))) + listener.onJobEnd(SparkListenerJobEnd( + jobId = 0, + time = System.currentTimeMillis(), + JobSucceeded + )) + + listener.onJobStart(SparkListenerJobStart( + jobId = 1, + time = System.currentTimeMillis(), + stageInfos = Nil, + createProperties(executionId))) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + listener.onJobEnd(SparkListenerJobEnd( + jobId = 1, + time = System.currentTimeMillis(), + JobSucceeded + )) + + assertJobs(statusStore.execution(executionId), completed = Seq(0, 1)) + } + + test("onExecutionEnd happens before onJobEnd(JobFailed)") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + + val executionId = 0 + val df = createTestDataFrame + listener.onOtherEvent(SparkListenerSQLExecutionStart( + executionId, + "test", + "test", + df.queryExecution.toString, + SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), + System.currentTimeMillis())) + listener.onJobStart(SparkListenerJobStart( + jobId = 0, + time = System.currentTimeMillis(), + stageInfos = Seq.empty, + createProperties(executionId))) + listener.onOtherEvent(SparkListenerSQLExecutionEnd( + executionId, System.currentTimeMillis())) + listener.onJobEnd(SparkListenerJobEnd( + jobId = 0, + time = System.currentTimeMillis(), + JobFailed(new RuntimeException("Oops")) + )) + + assertJobs(statusStore.execution(executionId), failed = Seq(0)) + } + + test("SPARK-11126: no memory leak when running non SQL jobs") { + val listener = spark.sharedState.statusStore.listener.get + // At the beginning of this test case, there should be no live data in the listener. + assert(listener.noLiveData()) + spark.sparkContext.parallelize(1 to 10).foreach(i => ()) + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL + // execution ends, which will not be triggered for non-SQL jobs. + assert(listener.noLiveData()) + } + + test("driver side SQL metrics") { + val statusStore = spark.sharedState.statusStore + val oldCount = statusStore.executionsList().size + + val expectedAccumValue = 12345 + val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue) + val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { + override lazy val sparkPlan = physicalPlan + override lazy val executedPlan = physicalPlan + } + + SQLExecution.withNewExecutionId(spark, dummyQueryExecution) { + physicalPlan.execute().collect() + } + + // Wait until the new execution is started and being tracked. + while (statusStore.executionsCount() < oldCount) { + Thread.sleep(100) + } + + // Wait for listener to finish computing the metrics for the execution. + while (statusStore.executionsList().last.metricValues == null) { + Thread.sleep(100) + } + + val execId = statusStore.executionsList().last.executionId + val metrics = statusStore.executionMetrics(execId) + val driverMetric = physicalPlan.metrics("dummy") + val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, Seq(expectedAccumValue)) + + assert(metrics.contains(driverMetric.id)) + assert(metrics(driverMetric.id) === expectedValue) + } + + test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") { + val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L))) + val json = JsonProtocol.sparkEventToJson(event) + assertValidDataInJson(json, + parse(""" + |{ + | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", + | "executionId": 1, + | "accumUpdates": [[2,3]] + |} + """.stripMargin)) + JsonProtocol.sparkEventFromJson(json) match { + case SparkListenerDriverAccumUpdates(executionId, accums) => + assert(executionId == 1L) + accums.foreach { case (a, b) => + assert(a == 2L) + assert(b == 3L) + } + } + + // Test a case where the numbers in the JSON can only fit in longs: + val longJson = parse( + """ + |{ + | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", + | "executionId": 4294967294, + | "accumUpdates": [[4294967294,3]] + |} + """.stripMargin) + JsonProtocol.sparkEventFromJson(longJson) match { + case SparkListenerDriverAccumUpdates(executionId, accums) => + assert(executionId == 4294967294L) + accums.foreach { case (a, b) => + assert(a == 4294967294L) + assert(b == 3L) + } + } + } + +} + + +/** + * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a [[SQLMetrics]] + * on the driver. + */ +private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExecNode { + override def sparkContext: SparkContext = sc + override def output: Seq[Attribute] = Seq() + + override val metrics: Map[String, SQLMetric] = Map( + "dummy" -> SQLMetrics.createMetric(sc, "dummy")) + + override def doExecute(): RDD[InternalRow] = { + longMetric("dummy") += expectedValue + + SQLMetrics.postDriverMetricUpdates( + sc, + sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY), + metrics.values.toSeq) + sc.emptyRDD + } +} + + +class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { + + test("no memory leak") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly + .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly + .set(ASYNC_TRACKING_ENABLED, false) + withSpark(new SparkContext(conf)) { sc => + quietly { + val spark = new SparkSession(sc) + import spark.implicits._ + // Run 100 successful executions and 100 failed executions. + // Each execution only has one job and one stage. + for (i <- 0 until 100) { + val df = Seq( + (1, 1), + (2, 2) + ).toDF() + df.collect() + try { + df.foreach(_ => throw new RuntimeException("Oops")) + } catch { + case e: SparkException => // This is expected for a failed job + } + } + sc.listenerBus.waitUntilEmpty(10000) + val statusStore = spark.sharedState.statusStore + assert(statusStore.executionsCount() <= 50) + // No live data should be left behind after all executions end. + assert(statusStore.listener.get.noLiveData()) + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d3a1d952/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala deleted file mode 100644 index 9329506..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ /dev/null @@ -1,531 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.ui - -import java.util.Properties - -import scala.collection.mutable.ListBuffer - -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark._ -import org.apache.spark.LocalSparkContext._ -import org.apache.spark.internal.config -import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler._ -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.status.ElementTrackingStore -import org.apache.spark.status.config._ -import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} -import org.apache.spark.util.kvstore.InMemoryStore - -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { - import testImplicits._ - - override protected def sparkConf = { - super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L).set(ASYNC_TRACKING_ENABLED, false) - } - - private def createTestDataFrame: DataFrame = { - Seq( - (1, 1), - (2, 2) - ).toDF().filter("_1 > 1") - } - - private def createProperties(executionId: Long): Properties = { - val properties = new Properties() - properties.setProperty(SQLExecution.EXECUTION_ID_KEY, executionId.toString) - properties - } - - private def createStageInfo(stageId: Int, attemptId: Int): StageInfo = new StageInfo( - stageId = stageId, - attemptId = attemptId, - // The following fields are not used in tests - name = "", - numTasks = 0, - rddInfos = Nil, - parentIds = Nil, - details = "" - ) - - private def createTaskInfo( - taskId: Int, - attemptNumber: Int, - accums: Map[Long, Long] = Map()): TaskInfo = { - val info = new TaskInfo( - taskId = taskId, - attemptNumber = attemptNumber, - // The following fields are not used in tests - index = 0, - launchTime = 0, - executorId = "", - host = "", - taskLocality = null, - speculative = false) - info.markFinished(TaskState.FINISHED, 1L) - info.setAccumulables(createAccumulatorInfos(accums)) - info - } - - private def createAccumulatorInfos(accumulatorUpdates: Map[Long, Long]): Seq[AccumulableInfo] = { - accumulatorUpdates.map { case (id, value) => - val acc = new LongAccumulator - acc.metadata = AccumulatorMetadata(id, None, false) - acc.toInfo(Some(value), None) - }.toSeq - } - - /** Return the shared SQL store from the active SparkSession. */ - private def statusStore: SQLAppStatusStore = - new SQLAppStatusStore(spark.sparkContext.statusStore.store) - - /** - * Runs a test with a temporary SQLAppStatusStore tied to a listener bus. Events can be sent to - * the listener bus to update the store, and all data will be cleaned up at the end of the test. - */ - private def sqlStoreTest(name: String) - (fn: (SQLAppStatusStore, SparkListenerBus) => Unit): Unit = { - test(name) { - val conf = sparkConf - val store = new ElementTrackingStore(new InMemoryStore(), conf) - val bus = new ReplayListenerBus() - val listener = new SQLAppStatusListener(conf, store, true) - bus.addListener(listener) - store.close(false) - val sqlStore = new SQLAppStatusStore(store, Some(listener)) - fn(sqlStore, bus) - } - } - - sqlStoreTest("basic") { (store, bus) => - def checkAnswer(actual: Map[Long, String], expected: Map[Long, Long]): Unit = { - assert(actual.size == expected.size) - expected.foreach { case (id, value) => - // The values in actual can be SQL metrics meaning that they contain additional formatting - // when converted to string. Verify that they start with the expected value. - // TODO: this is brittle. There is no requirement that the actual string needs to start - // with the accumulator value. - assert(actual.contains(id)) - val v = actual.get(id).get.trim - assert(v.startsWith(value.toString), s"Wrong value for accumulator $id") - } - } - - val executionId = 0 - val df = createTestDataFrame - val accumulatorIds = - SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)) - .allNodes.flatMap(_.metrics.map(_.accumulatorId)) - // Assume all accumulators are long - var accumulatorValue = 0L - val accumulatorUpdates = accumulatorIds.map { id => - accumulatorValue += 1L - (id, accumulatorValue) - }.toMap - - bus.postToAll(SparkListenerSQLExecutionStart( - executionId, - "test", - "test", - df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) - - bus.postToAll(SparkListenerJobStart( - jobId = 0, - time = System.currentTimeMillis(), - stageInfos = Seq( - createStageInfo(0, 0), - createStageInfo(1, 0) - ), - createProperties(executionId))) - bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) - - assert(store.executionMetrics(0).isEmpty) - - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( - // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), - (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) - ))) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - - // Driver accumulator updates don't belong to this execution should be filtered and no - // exception will be thrown. - bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( - // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), - (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) - ))) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) - - // Retrying a stage should reset the metrics - bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( - // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), - (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) - ))) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - - // Ignore the task end for the first attempt - bus.postToAll(SparkListenerTaskEnd( - stageId = 0, - stageAttemptId = 0, - taskType = "", - reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), - null)) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - - // Finish two tasks - bus.postToAll(SparkListenerTaskEnd( - stageId = 0, - stageAttemptId = 1, - taskType = "", - reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), - null)) - bus.postToAll(SparkListenerTaskEnd( - stageId = 0, - stageAttemptId = 1, - taskType = "", - reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), - null)) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5)) - - // Summit a new stage - bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0))) - - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( - // (task id, stage id, stage attempt, accum updates) - (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)), - (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates)) - ))) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7)) - - // Finish two tasks - bus.postToAll(SparkListenerTaskEnd( - stageId = 1, - stageAttemptId = 0, - taskType = "", - reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)), - null)) - bus.postToAll(SparkListenerTaskEnd( - stageId = 1, - stageAttemptId = 0, - taskType = "", - reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), - null)) - - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) - - assertJobs(store.execution(0), running = Seq(0)) - - bus.postToAll(SparkListenerJobEnd( - jobId = 0, - time = System.currentTimeMillis(), - JobSucceeded - )) - bus.postToAll(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) - - assertJobs(store.execution(0), completed = Seq(0)) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) - } - - sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) => - val executionId = 0 - val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( - executionId, - "test", - "test", - df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( - jobId = 0, - time = System.currentTimeMillis(), - stageInfos = Nil, - createProperties(executionId))) - bus.postToAll(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) - bus.postToAll(SparkListenerJobEnd( - jobId = 0, - time = System.currentTimeMillis(), - JobSucceeded - )) - - assertJobs(store.execution(0), completed = Seq(0)) - } - - sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) => - val executionId = 0 - val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( - executionId, - "test", - "test", - df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( - jobId = 0, - time = System.currentTimeMillis(), - stageInfos = Nil, - createProperties(executionId))) - bus.postToAll(SparkListenerJobEnd( - jobId = 0, - time = System.currentTimeMillis(), - JobSucceeded - )) - - bus.postToAll(SparkListenerJobStart( - jobId = 1, - time = System.currentTimeMillis(), - stageInfos = Nil, - createProperties(executionId))) - bus.postToAll(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) - bus.postToAll(SparkListenerJobEnd( - jobId = 1, - time = System.currentTimeMillis(), - JobSucceeded - )) - - assertJobs(store.execution(0), completed = Seq(0, 1)) - } - - sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) => - val executionId = 0 - val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( - executionId, - "test", - "test", - df.queryExecution.toString, - SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), - System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( - jobId = 0, - time = System.currentTimeMillis(), - stageInfos = Seq.empty, - createProperties(executionId))) - bus.postToAll(SparkListenerSQLExecutionEnd( - executionId, System.currentTimeMillis())) - bus.postToAll(SparkListenerJobEnd( - jobId = 0, - time = System.currentTimeMillis(), - JobFailed(new RuntimeException("Oops")) - )) - - assertJobs(store.execution(0), failed = Seq(0)) - } - - test("SPARK-11126: no memory leak when running non SQL jobs") { - val previousStageNumber = statusStore.executionsList().size - spark.sparkContext.parallelize(1 to 10).foreach(i => ()) - spark.sparkContext.listenerBus.waitUntilEmpty(10000) - // listener should ignore the non SQL stage - assert(statusStore.executionsList().size == previousStageNumber) - - spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ()) - spark.sparkContext.listenerBus.waitUntilEmpty(10000) - // listener should save the SQL stage - assert(statusStore.executionsList().size == previousStageNumber + 1) - } - - test("driver side SQL metrics") { - val oldCount = statusStore.executionsList().size - val expectedAccumValue = 12345L - val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue) - val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) { - override lazy val sparkPlan = physicalPlan - override lazy val executedPlan = physicalPlan - } - - SQLExecution.withNewExecutionId(spark, dummyQueryExecution) { - physicalPlan.execute().collect() - } - - while (statusStore.executionsList().size < oldCount) { - Thread.sleep(100) - } - - // Wait for listener to finish computing the metrics for the execution. - while (statusStore.executionsList().last.metricValues == null) { - Thread.sleep(100) - } - - val execId = statusStore.executionsList().last.executionId - val metrics = statusStore.executionMetrics(execId) - val driverMetric = physicalPlan.metrics("dummy") - val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, Seq(expectedAccumValue)) - - assert(metrics.contains(driverMetric.id)) - assert(metrics(driverMetric.id) === expectedValue) - } - - private def assertJobs( - exec: Option[SQLExecutionUIData], - running: Seq[Int] = Nil, - completed: Seq[Int] = Nil, - failed: Seq[Int] = Nil): Unit = { - - val actualRunning = new ListBuffer[Int]() - val actualCompleted = new ListBuffer[Int]() - val actualFailed = new ListBuffer[Int]() - - exec.get.jobs.foreach { case (jobId, jobStatus) => - jobStatus match { - case JobExecutionStatus.RUNNING => actualRunning += jobId - case JobExecutionStatus.SUCCEEDED => actualCompleted += jobId - case JobExecutionStatus.FAILED => actualFailed += jobId - case _ => fail(s"Unexpected status $jobStatus") - } - } - - assert(actualRunning.toSeq.sorted === running) - assert(actualCompleted.toSeq.sorted === completed) - assert(actualFailed.toSeq.sorted === failed) - } - - test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") { - val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L))) - val json = JsonProtocol.sparkEventToJson(event) - assertValidDataInJson(json, - parse(""" - |{ - | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", - | "executionId": 1, - | "accumUpdates": [[2,3]] - |} - """.stripMargin)) - JsonProtocol.sparkEventFromJson(json) match { - case SparkListenerDriverAccumUpdates(executionId, accums) => - assert(executionId == 1L) - accums.foreach { case (a, b) => - assert(a == 2L) - assert(b == 3L) - } - } - - // Test a case where the numbers in the JSON can only fit in longs: - val longJson = parse( - """ - |{ - | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", - | "executionId": 4294967294, - | "accumUpdates": [[4294967294,3]] - |} - """.stripMargin) - JsonProtocol.sparkEventFromJson(longJson) match { - case SparkListenerDriverAccumUpdates(executionId, accums) => - assert(executionId == 4294967294L) - accums.foreach { case (a, b) => - assert(a == 4294967294L) - assert(b == 3L) - } - } - } - -} - - -/** - * A dummy [[org.apache.spark.sql.execution.SparkPlan]] that updates a [[SQLMetrics]] - * on the driver. - */ -private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExecNode { - override def sparkContext: SparkContext = sc - override def output: Seq[Attribute] = Seq() - - override val metrics: Map[String, SQLMetric] = Map( - "dummy" -> SQLMetrics.createMetric(sc, "dummy")) - - override def doExecute(): RDD[InternalRow] = { - longMetric("dummy") += expectedValue - - SQLMetrics.postDriverMetricUpdates( - sc, - sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY), - metrics.values.toSeq) - sc.emptyRDD - } -} - - -class SQLListenerMemoryLeakSuite extends SparkFunSuite { - - test("no memory leak") { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly - .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly - .set(ASYNC_TRACKING_ENABLED, false) - withSpark(new SparkContext(conf)) { sc => - quietly { - val spark = new SparkSession(sc) - import spark.implicits._ - // Run 100 successful executions and 100 failed executions. - // Each execution only has one job and one stage. - for (i <- 0 until 100) { - val df = Seq( - (1, 1), - (2, 2) - ).toDF() - df.collect() - try { - df.foreach(_ => throw new RuntimeException("Oops")) - } catch { - case e: SparkException => // This is expected for a failed job - } - } - sc.listenerBus.waitUntilEmpty(10000) - - val statusStore = new SQLAppStatusStore(sc.statusStore.store) - assert(statusStore.executionsList().size <= 50) - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org