Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r149579972 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.ui + +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap + +import org.apache.spark.{JobExecutionStatus, SparkConf} +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric._ +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.status.LiveEntity +import org.apache.spark.status.config._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.kvstore.KVStore + +private[sql] class SQLAppStatusListener( + conf: SparkConf, + kvstore: KVStore, + live: Boolean, + ui: Option[SparkUI] = None) + extends SparkListener with Logging { + + // How often to flush intermediate stage of a live execution to the store. When replaying logs, + // never flush (only do the very last write). + private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + private val liveExecutions = new HashMap[Long, LiveExecutionData]() + private val stageMetrics = new HashMap[Int, LiveStageMetrics]() + + private var uiInitialized = false + + override def onJobStart(event: SparkListenerJobStart): Unit = { + val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY) + if (executionIdString == null) { + // This is not a job created by SQL + return + } + + val executionId = executionIdString.toLong + val jobId = event.jobId + val exec = getOrCreateExecution(executionId) + + // Record the accumulator IDs for the stages of this job, so that the code that keeps + // track of the metrics knows which accumulators to look at. + val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList + event.stageIds.foreach { id => + stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap())) + } + + exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING) + exec.stages = event.stageIds + update(exec) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + if (!isSQLStage(event.stageInfo.stageId)) { + return + } + + // Reset the metrics tracking object for the new attempt. + stageMetrics.get(event.stageInfo.stageId).foreach { metrics => + metrics.taskMetrics.clear() + metrics.attemptId = event.stageInfo.attemptId + } + } + + override def onJobEnd(event: SparkListenerJobEnd): Unit = { + liveExecutions.values.foreach { exec => + if (exec.jobs.contains(event.jobId)) { + val result = event.jobResult match { + case JobSucceeded => JobExecutionStatus.SUCCEEDED + case _ => JobExecutionStatus.FAILED + } + exec.jobs = exec.jobs + (event.jobId -> result) + exec.endEvents += 1 + update(exec) + } + } + } + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) => + updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false) + } + } + + override def onTaskEnd(event: SparkListenerTaskEnd): Unit = { + if (!isSQLStage(event.stageId)) { + return + } + + val info = event.taskInfo + // SPARK-20342. If processing events from a live application, use the task metrics info to + // work around a race in the DAGScheduler. The metrics info does not contain accumulator info + // when reading event logs in the SHS, so we have to rely on the accumulator in that case. + val accums = if (live && event.taskMetrics != null) { + event.taskMetrics.externalAccums.flatMap { a => + // This call may fail if the accumulator is gc'ed, so account for that. + try { + Some(a.toInfo(Some(a.value), None)) + } catch { + case _: IllegalAccessError => None + } + } + } else { + info.accumulables + } + updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums, + info.successful) + } + + def executionMetrics(executionId: Long): Map[Long, String] = synchronized { + liveExecutions.get(executionId).map { exec => + if (exec.metricsValues != null) { + exec.metricsValues + } else { + aggregateMetrics(exec) + } + }.getOrElse { + throw new NoSuchElementException(s"execution $executionId not found") + } + } + + private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized { + val metricIds = exec.metrics.map(_.accumulatorId).sorted + val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap + val metrics = exec.stages + .flatMap(stageMetrics.get) + .flatMap(_.taskMetrics.values().asScala) + .flatMap { metrics => metrics.ids.zip(metrics.values) } + + (metrics ++ exec.driverAccumUpdates.toSeq) + .filter { case (id, _) => metricIds.contains(id) } + .groupBy(_._1) + .map { case (id, values) => + id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq) + } + } + + private def updateStageMetrics( + stageId: Int, + attemptId: Int, + taskId: Long, + accumUpdates: Seq[AccumulableInfo], + succeeded: Boolean): Unit = { + stageMetrics.get(stageId).foreach { metrics => + if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { + return + } + + val oldTaskMetrics = metrics.taskMetrics.get(taskId) + if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { + return + } + + val updates = accumUpdates + .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } + .sortBy(_.id) + + if (updates.isEmpty) { + return + } + + val ids = new Array[Long](updates.size) + val values = new Array[Long](updates.size) + updates.zipWithIndex.foreach { case (acc, idx) => + ids(idx) = acc.id + // In a live application, accumulators have Long values, but when reading from event + // logs, they have String values. For now, assume all accumulators are Long and covert + // accordingly. + values(idx) = acc.update.get match { + case s: String => s.toLong + case l: Long => l + case o => throw new IllegalArgumentException(s"Unexpected: $o") + } + } + + metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) + } + } + + private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { + // Install the SQL tab in a live app if it hasn't been initialized yet. + if (!uiInitialized) { + ui.foreach { _ui => + new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui) + } + uiInitialized = true + } + + val SparkListenerSQLExecutionStart(executionId, description, details, + physicalPlanDescription, sparkPlanInfo, time) = event + + def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + nodes.map { + case cluster: SparkPlanGraphCluster => + val storedCluster = new SparkPlanGraphClusterWrapper( + cluster.id, + cluster.name, + cluster.desc, + toStoredNodes(cluster.nodes), + cluster.metrics) + new SparkPlanGraphNodeWrapper(null, storedCluster) + + case node => + new SparkPlanGraphNodeWrapper(node, null) + } + } + + val planGraph = SparkPlanGraph(sparkPlanInfo) + val sqlPlanMetrics = planGraph.allNodes.flatMap { node => + node.metrics.map { metric => (metric.accumulatorId, metric) } + }.toMap.values.toList + + val graphToStore = new SparkPlanGraphWrapper( + executionId, + toStoredNodes(planGraph.nodes), + planGraph.edges) + kvstore.write(graphToStore) + + val exec = getOrCreateExecution(executionId) + exec.description = description + exec.details = details + exec.physicalPlanDescription = physicalPlanDescription + exec.metrics = sqlPlanMetrics + exec.submissionTime = time + update(exec) + } + + private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + val SparkListenerSQLExecutionEnd(executionId, time) = event + liveExecutions.get(executionId).foreach { exec => + synchronized { + exec.metricsValues = aggregateMetrics(exec) + + // Remove stale LiveStageMetrics objects for stages that are not active anymore. + val activeStages = liveExecutions.values.flatMap { other => + if (other != exec) other.stages else Nil + }.toSet + stageMetrics.retain { case (id, _) => activeStages.contains(id) } + + exec.completionTime = Some(new Date(time)) + exec.endEvents += 1 + + update(exec) + } + } + } + + private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = { + val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event + liveExecutions.get(executionId).foreach { exec => + exec.driverAccumUpdates = accumUpdates.toMap + update(exec) + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionStart => onExecutionStart(e) + case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) + case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) + case _ => // Ignore + } + + private def getOrCreateExecution(executionId: Long): LiveExecutionData = { + liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId)) + } + + private def update(exec: LiveExecutionData): Unit = { + val now = System.nanoTime() + if (exec.endEvents >= exec.jobs.size + 1) { + liveExecutions.remove(exec.executionId) + exec.write(kvstore, now) --- End diff -- That's true; I've actually fixed that in the M6 code: https://github.com/vanzin/spark/pull/51/files#diff-a74d84702d8d47d5269e96740a55a3caR56 It's not very easy to fix here without writing throw-away code to propagate some "close()" call to this listener.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org