Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149578074
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the 
store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) 
conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = 
event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the 
code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, 
new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, 
accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the 
task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not 
contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the 
accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for 
that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, 
accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = 
synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] 
= synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, 
m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), 
values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || 
metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when 
reading from event
    +        // logs, they have String values. For now, assume all accumulators 
are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): 
Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): 
Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not 
active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: 
SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = 
{
    +    liveExecutions.getOrElseUpdate(executionId, new 
LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    sorry, I meant if the _application_ is killed, so the event log just ends 
abruptly.  I think the old history server code would still show you the updated 
metrics for all the tasks that had completed.  But seems like after this 
change, the history server won't show anything for any jobs which hadn't 
completed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to