Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149579972
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the 
code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the 
task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for 
that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, 
accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = 
synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] 
= synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, 
m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), 
values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || 
metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when 
reading from event
    +        // logs, they have String values. For now, assume all accumulators 
are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): 
Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): 
Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not 
active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: 
SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = 
{
    +    liveExecutions.getOrElseUpdate(executionId, new 
LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    That's true; I've actually fixed that in the M6 code:
    
https://github.com/vanzin/spark/pull/51/files#diff-a74d84702d8d47d5269e96740a55a3caR56
    
    It's not very easy to fix here without writing throw-away code to propagate 
some "close()" call to this listener.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to