Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19383#discussion_r146689668
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.status
    +
    +import java.util.Date
    +
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.TaskMetrics
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.status.api.v1
    +import org.apache.spark.storage._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +/**
    + * A Spark listener that writes application information to a data store. 
The types written to the
    + * store are defined in the `storeTypes.scala` file and are based on the 
public REST API.
    + */
    +private class AppStatusListener(kvstore: KVStore) extends SparkListener 
with Logging {
    +
    +  private var sparkVersion = SPARK_VERSION
    +  private var appInfo: v1.ApplicationInfo = null
    +  private var coresPerTask: Int = 1
    +
    +  // Keep track of live entities, so that task metrics can be efficiently 
updated (without
    +  // causing too many writes to the underlying store, and other expensive 
operations).
    +  private val liveStages = new HashMap[(Int, Int), LiveStage]()
    +  private val liveJobs = new HashMap[Int, LiveJob]()
    +  private val liveExecutors = new HashMap[String, LiveExecutor]()
    +  private val liveTasks = new HashMap[Long, LiveTask]()
    +  private val liveRDDs = new HashMap[Int, LiveRDD]()
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case SparkListenerLogStart(version) => sparkVersion = version
    +    case _ =>
    +  }
    +
    +  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
    +    assert(event.appId.isDefined, "Application without IDs are not 
supported.")
    +
    +    val attempt = new v1.ApplicationAttemptInfo(
    +      event.appAttemptId,
    +      new Date(event.time),
    +      new Date(-1),
    +      new Date(event.time),
    +      -1L,
    +      event.sparkUser,
    +      false,
    +      sparkVersion)
    +
    +    appInfo = new v1.ApplicationInfo(
    +      event.appId.get,
    +      event.appName,
    +      None,
    +      None,
    +      None,
    +      None,
    +      Seq(attempt))
    +
    +    kvstore.write(new ApplicationInfoWrapper(appInfo))
    +  }
    +
    +  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit 
= {
    +    val old = appInfo.attempts.head
    +    val attempt = new v1.ApplicationAttemptInfo(
    +      old.attemptId,
    +      old.startTime,
    +      new Date(event.time),
    +      new Date(event.time),
    +      event.time - old.startTime.getTime(),
    +      old.sparkUser,
    +      true,
    +      old.appSparkVersion)
    +
    +    appInfo = new v1.ApplicationInfo(
    +      appInfo.id,
    +      appInfo.name,
    +      None,
    +      None,
    +      None,
    +      None,
    +      Seq(attempt))
    +    kvstore.write(new ApplicationInfoWrapper(appInfo))
    +  }
    +
    +  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
    +    // This needs to be an update in case an executor re-registers after 
the driver has
    +    // marked it as "dead".
    +    val exec = getOrCreateExecutor(event.executorId)
    +    exec.host = event.executorInfo.executorHost
    +    exec.isActive = true
    +    exec.totalCores = event.executorInfo.totalCores
    +    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
    +    exec.executorLogs = event.executorInfo.logUrlMap
    +    update(exec)
    +  }
    +
    +  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): 
Unit = {
    +    liveExecutors.remove(event.executorId).foreach { exec =>
    +      exec.isActive = false
    +      update(exec)
    +    }
    +  }
    +
    +  override def onExecutorBlacklisted(event: 
SparkListenerExecutorBlacklisted): Unit = {
    +    updateBlackListStatus(event.executorId, true)
    +  }
    +
    +  override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
    +    updateBlackListStatus(event.executorId, false)
    +  }
    +
    +  override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): 
Unit = {
    +    updateNodeBlackList(event.hostId, true)
    +  }
    +
    +  override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): 
Unit = {
    +    updateNodeBlackList(event.hostId, false)
    +  }
    +
    +  private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
    +    liveExecutors.get(execId).foreach { exec =>
    +      exec.isBlacklisted = blacklisted
    +      update(exec)
    +    }
    +  }
    +
    +  private def updateNodeBlackList(host: String, blacklisted: Boolean): 
Unit = {
    +    // Implicitly (un)blacklist every executor associated with the node.
    +    liveExecutors.values.foreach { exec =>
    +      if (exec.hostname == host) {
    +        exec.isBlacklisted = blacklisted
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    // Compute (a potential over-estimate of) the number of tasks that 
will be run by this job.
    +    // This may be an over-estimate because the job start event references 
all of the result
    +    // stages' transitive stage dependencies, but some of these stages 
might be skipped if their
    +    // output is available from earlier runs.
    +    // See https://github.com/apache/spark/pull/3009 for a more extensive 
discussion.
    +    val numTasks = {
    +      val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
    +      missingStages.map(_.numTasks).sum
    +    }
    +
    +    val lastStageInfo = event.stageInfos.lastOption
    +    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown 
Stage Name)")
    +
    +    val jobGroup = Option(event.properties)
    +      .flatMap { p => 
Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
    +
    +    val job = new LiveJob(
    +      event.jobId,
    +      lastStageName,
    +      Some(new Date(event.time)),
    +      event.stageIds,
    +      jobGroup,
    +      numTasks)
    +    liveJobs.put(event.jobId, job)
    +    update(job)
    +
    +    val schedulingPool = Option(event.properties).flatMap { p =>
    +      Option(p.getProperty("spark.scheduler.pool"))
    +    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
    +
    +    event.stageInfos.foreach { stageInfo =>
    +      // A new job submission may re-use an existing stage, so this code 
needs to do an update
    +      // instead of just a write.
    +      val stage = getOrCreateStage(stageInfo)
    +      stage.jobs :+= job
    +      stage.jobIds += event.jobId
    +      stage.schedulingPool = schedulingPool
    +      update(stage)
    +    }
    +
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveJobs.remove(event.jobId).foreach { job =>
    +      job.status = event.jobResult match {
    +        case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +        case JobFailed(_) => JobExecutionStatus.FAILED
    +      }
    +
    +      job.completionTime = Some(new Date(event.time))
    +      update(job)
    +    }
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
    +    val stage = getOrCreateStage(event.stageInfo)
    +    stage.status = v1.StageStatus.ACTIVE
    +
    +    // Look at all active jobs to find the ones that mention this stage.
    +    stage.jobs = liveJobs.values
    +      .filter(_.stageIds.contains(event.stageInfo.stageId))
    +      .toSeq
    +    stage.jobIds = stage.jobs.map(_.jobId).toSet
    +
    +    stage.jobs.foreach { job =>
    +      job.completedStages = job.completedStages - event.stageInfo.stageId
    +      job.activeStages += 1
    +      update(job)
    +    }
    +
    +    event.stageInfo.rddInfos.foreach { info =>
    +      if (info.storageLevel.isValid) {
    +        update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)))
    +      }
    +    }
    +
    +    update(stage)
    +  }
    +
    +  override def onTaskStart(event: SparkListenerTaskStart): Unit = {
    +    val task = new LiveTask(event.taskInfo, event.stageId, 
event.stageAttemptId)
    +    liveTasks.put(event.taskInfo.taskId, task)
    +    update(task)
    +
    +    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage 
=>
    +      stage.activeTasks += 1
    +      stage.firstLaunchTime = math.min(stage.firstLaunchTime, 
event.taskInfo.launchTime)
    +      update(stage)
    +
    +      stage.jobs.foreach { job =>
    +        job.activeTasks += 1
    +        update(job)
    +      }
    +    }
    +
    +    liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
    +      exec.activeTasks += 1
    +      exec.totalTasks += 1
    +      update(exec)
    +    }
    +  }
    +
    +  override def onTaskGettingResult(event: SparkListenerTaskGettingResult): 
Unit = {
    +    liveTasks.get(event.taskInfo.taskId).foreach { task =>
    +      update(task)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    // If stage attempt id is -1, it means the DAGScheduler had no idea 
which attempt this task
    +    // completion event is for. Let's just drop it here. This means we 
might have some speculation
    +    // tasks on the web ui that are never marked as complete.
    +    if (event.taskInfo == null || event.stageAttemptId == -1) {
    +      return
    +    }
    +
    +    val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task 
=>
    +      val errorMessage = event.reason match {
    +        case Success =>
    +          None
    +        case k: TaskKilled =>
    +          Some(k.reason)
    +        case e: ExceptionFailure => // Handle ExceptionFailure because we 
might have accumUpdates
    +          Some(e.toErrorString)
    +        case e: TaskFailedReason => // All other failure cases
    +          Some(e.toErrorString)
    +        case other =>
    +          logInfo(s"Unhandled task end reason: $other")
    +          None
    +      }
    +      task.errorMessage = errorMessage
    +      val delta = task.updateMetrics(event.taskMetrics)
    +      update(task)
    +      delta
    +    }.orNull
    +
    +    val (completedDelta, failedDelta) = event.reason match {
    +      case Success =>
    +        (1, 0)
    +      case _ =>
    +        (0, 1)
    +    }
    +
    +    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage 
=>
    +      if (metricsDelta != null) {
    +        stage.metrics.update(metricsDelta)
    +      }
    +      stage.activeTasks -= 1
    +      stage.completedTasks += completedDelta
    +      stage.failedTasks += failedDelta
    +      update(stage)
    +
    +      stage.jobs.foreach { job =>
    +        job.activeTasks -= 1
    +        job.completedTasks += completedDelta
    +        job.failedTasks += failedDelta
    +        update(job)
    +      }
    +
    +      val esummary = stage.executorSummary(event.taskInfo.executorId)
    +      esummary.taskTime += event.taskInfo.duration
    +      esummary.succeededTasks += completedDelta
    +      esummary.failedTasks += failedDelta
    +      if (metricsDelta != null) {
    +        esummary.metrics.update(metricsDelta)
    +      }
    +      update(esummary)
    +    }
    +
    +    liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
    +      if (event.taskMetrics != null) {
    +        val readMetrics = event.taskMetrics.shuffleReadMetrics
    +        exec.totalGcTime += event.taskMetrics.jvmGCTime
    +        exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
    +        exec.totalShuffleRead += readMetrics.localBytesRead + 
readMetrics.remoteBytesRead
    +        exec.totalShuffleWrite += 
event.taskMetrics.shuffleWriteMetrics.bytesWritten
    +      }
    +
    +      exec.activeTasks -= 1
    +      exec.completedTasks += completedDelta
    +      exec.failedTasks += failedDelta
    +      exec.totalDuration += event.taskInfo.duration
    +      update(exec)
    +    }
    +  }
    +
    +  override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
    +    liveStages.remove((event.stageInfo.stageId, 
event.stageInfo.attemptId)).foreach { stage =>
    +      stage.info = event.stageInfo
    +
    +      // Because of SPARK-20205, old event logs may contain valid stages 
without a submission time
    +      // in their start event. In those cases, we can only detect whether 
a stage was skipped by
    +      // waiting until the completion event, at which point the field 
would have been set.
    +      stage.status = event.stageInfo.failureReason match {
    +        case Some(_) => v1.StageStatus.FAILED
    +        case _ if event.stageInfo.submissionTime.isDefined => 
v1.StageStatus.COMPLETE
    +        case _ => v1.StageStatus.SKIPPED
    +      }
    +      update(stage)
    +
    +      stage.jobs.foreach { job =>
    +        stage.status match {
    +          case v1.StageStatus.COMPLETE =>
    +            job.completedStages = job.completedStages + 
event.stageInfo.stageId
    +          case v1.StageStatus.SKIPPED =>
    +            job.skippedStages += event.stageInfo.stageId
    +            job.skippedTasks += event.stageInfo.numTasks
    +          case _ =>
    +            job.failedStages += 1
    +        }
    +        job.activeStages -= 1
    +        update(job)
    +      }
    +
    +      stage.executorSummaries.values.foreach(update)
    +      update(stage)
    +    }
    +  }
    +
    +  override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): 
Unit = {
    +    // This needs to set fields that are already set by onExecutorAdded 
because the driver is
    +    // considered an "executor" in the UI, but does not have a 
SparkListenerExecutorAdded event.
    +    val exec = getOrCreateExecutor(event.blockManagerId.executorId)
    +    exec.hostPort = event.blockManagerId.hostPort
    +    event.maxOnHeapMem.foreach { _ =>
    +      exec.totalOnHeap = event.maxOnHeapMem.get
    +      exec.totalOffHeap = event.maxOffHeapMem.get
    +    }
    +    exec.isActive = true
    +    exec.maxMemory = event.maxMem
    +    update(exec)
    +  }
    +
    +  override def onBlockManagerRemoved(event: 
SparkListenerBlockManagerRemoved): Unit = {
    +    // Nothing to do here. Covered by onExecutorRemoved.
    +  }
    +
    +  override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
    +    liveRDDs.remove(event.rddId)
    +    kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, sid, sAttempt, 
accumUpdates) =>
    +      liveTasks.get(taskId).foreach { task =>
    +        val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
    +        val delta = task.updateMetrics(metrics)
    +        update(task)
    +
    +        liveStages.get((sid, sAttempt)).foreach { stage =>
    +          stage.metrics.update(delta)
    +          update(stage)
    +
    +          val esummary = stage.executorSummary(event.execId)
    +          esummary.metrics.update(delta)
    +          update(esummary)
    +        }
    +      }
    +    }
    +  }
    +
    +  override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
    +    event.blockUpdatedInfo.blockId match {
    +      case block: RDDBlockId => updateRDDBlock(event, block)
    +      case _ => // TODO: API only covers RDD storage. UI might need 
shuffle storage too.
    +    }
    +  }
    +
    +  private def updateRDDBlock(event: SparkListenerBlockUpdated, block: 
RDDBlockId): Unit = {
    +    val executorId = event.blockUpdatedInfo.blockManagerId.executorId
    +
    +    // Whether values are being added to or removed from the existing 
accounting.
    +    val storageLevel = event.blockUpdatedInfo.storageLevel
    +    val diskDelta = event.blockUpdatedInfo.diskSize * (if 
(storageLevel.useDisk) 1 else -1)
    +    val memoryDelta = event.blockUpdatedInfo.memSize * (if 
(storageLevel.useMemory) 1 else -1)
    +
    +    // Function to apply a delta to a value, but ensure that it doesn't go 
negative.
    +    def newValue(old: Long, delta: Long): Long = math.max(0, old + delta)
    +
    +    val updatedStorageLevel = if (storageLevel.isValid) {
    +      Some(storageLevel.description)
    +    } else {
    +      None
    +    }
    +
    +    // We need information about the executor to update some memory 
accounting values in the
    +    // RDD info, so read that beforehand.
    +    val maybeExec = liveExecutors.get(executorId)
    +    var rddBlocksDelta = 0
    +
    +    // Update the block entry in the RDD info, keeping track of the deltas 
above so that we
    +    // can update the executor information too.
    +    liveRDDs.get(block.rddId).foreach { rdd =>
    +      val partition = rdd.partition(block.name)
    +
    +      val executors = if (updatedStorageLevel.isDefined) {
    +        if (!partition.executors.contains(executorId)) {
    +          rddBlocksDelta = 1
    +        }
    +        partition.executors + executorId
    +      } else {
    +        rddBlocksDelta = -1
    +        partition.executors - executorId
    +      }
    +
    +      // Only update the partition if it's still stored in some executor, 
otherwise get rid of it.
    +      if (executors.nonEmpty) {
    +        if (updatedStorageLevel.isDefined) {
    +          partition.storageLevel = updatedStorageLevel.get
    +        }
    +        partition.memoryUsed = newValue(partition.memoryUsed, memoryDelta)
    +        partition.diskUsed = newValue(partition.diskUsed, diskDelta)
    +        partition.executors = executors
    +      } else {
    +        rdd.removePartition(block.name)
    +      }
    +
    +      maybeExec.foreach { exec =>
    +        if (exec.rddBlocks + rddBlocksDelta > 0) {
    +          val dist = rdd.distribution(exec)
    +          dist.memoryRemaining = newValue(dist.memoryRemaining, 
-memoryDelta)
    --- End diff --
    
    Memory remaining (and the on- and off-heap breakdown) looks wrong to me.
    
    `LiveRDDDistribution` is local to one `LiveRDD`.  But you will have 
partitions from multiple RDDs stored on one executor.  So the memory remaining 
needs to take into account all of the rdds on that executor.  I don't think you 
can keep this value precomputed -- an update to a totally different RDD would 
change it.  (unless on every block update, you update the value for all rdds 
stored on that executor.)
    
    The current UI handles this by storing it in the `StorageStatusListener` by 
executor, and populating this info in every request for the RDD info.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to