Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19383#discussion_r146703962
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.status
    +
    +import java.util.Date
    +
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.TaskMetrics
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.status.api.v1
    +import org.apache.spark.storage._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +/**
    + * A Spark listener that writes application information to a data store. 
The types written to the
    + * store are defined in the `storeTypes.scala` file and are based on the 
public REST API.
    + */
    +private class AppStatusListener(kvstore: KVStore) extends SparkListener 
with Logging {
    +
    +  private var sparkVersion = SPARK_VERSION
    +  private var appInfo: v1.ApplicationInfo = null
    +  private var coresPerTask: Int = 1
    +
    +  // Keep track of live entities, so that task metrics can be efficiently 
updated (without
    +  // causing too many writes to the underlying store, and other expensive 
operations).
    +  private val liveStages = new HashMap[(Int, Int), LiveStage]()
    +  private val liveJobs = new HashMap[Int, LiveJob]()
    +  private val liveExecutors = new HashMap[String, LiveExecutor]()
    +  private val liveTasks = new HashMap[Long, LiveTask]()
    +  private val liveRDDs = new HashMap[Int, LiveRDD]()
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case SparkListenerLogStart(version) => sparkVersion = version
    +    case _ =>
    +  }
    +
    +  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
    +    assert(event.appId.isDefined, "Application without IDs are not 
supported.")
    +
    +    val attempt = new v1.ApplicationAttemptInfo(
    +      event.appAttemptId,
    +      new Date(event.time),
    +      new Date(-1),
    +      new Date(event.time),
    +      -1L,
    +      event.sparkUser,
    +      false,
    +      sparkVersion)
    +
    +    appInfo = new v1.ApplicationInfo(
    +      event.appId.get,
    +      event.appName,
    +      None,
    +      None,
    +      None,
    +      None,
    +      Seq(attempt))
    +
    +    kvstore.write(new ApplicationInfoWrapper(appInfo))
    +  }
    +
    +  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit 
= {
    +    val old = appInfo.attempts.head
    +    val attempt = new v1.ApplicationAttemptInfo(
    +      old.attemptId,
    +      old.startTime,
    +      new Date(event.time),
    +      new Date(event.time),
    +      event.time - old.startTime.getTime(),
    +      old.sparkUser,
    +      true,
    +      old.appSparkVersion)
    +
    +    appInfo = new v1.ApplicationInfo(
    +      appInfo.id,
    +      appInfo.name,
    +      None,
    +      None,
    +      None,
    +      None,
    +      Seq(attempt))
    +    kvstore.write(new ApplicationInfoWrapper(appInfo))
    +  }
    +
    +  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
    +    // This needs to be an update in case an executor re-registers after 
the driver has
    +    // marked it as "dead".
    +    val exec = getOrCreateExecutor(event.executorId)
    +    exec.host = event.executorInfo.executorHost
    +    exec.isActive = true
    +    exec.totalCores = event.executorInfo.totalCores
    +    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
    +    exec.executorLogs = event.executorInfo.logUrlMap
    +    update(exec)
    +  }
    +
    +  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): 
Unit = {
    +    liveExecutors.remove(event.executorId).foreach { exec =>
    +      exec.isActive = false
    +      update(exec)
    +    }
    +  }
    +
    +  override def onExecutorBlacklisted(event: 
SparkListenerExecutorBlacklisted): Unit = {
    +    updateBlackListStatus(event.executorId, true)
    +  }
    +
    +  override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
    +    updateBlackListStatus(event.executorId, false)
    +  }
    +
    +  override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): 
Unit = {
    +    updateNodeBlackList(event.hostId, true)
    +  }
    +
    +  override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): 
Unit = {
    +    updateNodeBlackList(event.hostId, false)
    +  }
    +
    +  private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
    +    liveExecutors.get(execId).foreach { exec =>
    +      exec.isBlacklisted = blacklisted
    +      update(exec)
    +    }
    +  }
    +
    +  private def updateNodeBlackList(host: String, blacklisted: Boolean): 
Unit = {
    +    // Implicitly (un)blacklist every executor associated with the node.
    +    liveExecutors.values.foreach { exec =>
    +      if (exec.hostname == host) {
    +        exec.isBlacklisted = blacklisted
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    // Compute (a potential over-estimate of) the number of tasks that 
will be run by this job.
    +    // This may be an over-estimate because the job start event references 
all of the result
    +    // stages' transitive stage dependencies, but some of these stages 
might be skipped if their
    +    // output is available from earlier runs.
    +    // See https://github.com/apache/spark/pull/3009 for a more extensive 
discussion.
    +    val numTasks = {
    +      val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
    +      missingStages.map(_.numTasks).sum
    +    }
    +
    +    val lastStageInfo = event.stageInfos.lastOption
    +    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown 
Stage Name)")
    +
    +    val jobGroup = Option(event.properties)
    +      .flatMap { p => 
Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
    +
    +    val job = new LiveJob(
    +      event.jobId,
    +      lastStageName,
    +      Some(new Date(event.time)),
    +      event.stageIds,
    +      jobGroup,
    +      numTasks)
    +    liveJobs.put(event.jobId, job)
    +    update(job)
    +
    +    val schedulingPool = Option(event.properties).flatMap { p =>
    +      Option(p.getProperty("spark.scheduler.pool"))
    +    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
    +
    +    event.stageInfos.foreach { stageInfo =>
    +      // A new job submission may re-use an existing stage, so this code 
needs to do an update
    +      // instead of just a write.
    +      val stage = getOrCreateStage(stageInfo)
    +      stage.jobs :+= job
    +      stage.jobIds += event.jobId
    +      stage.schedulingPool = schedulingPool
    +      update(stage)
    +    }
    +
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveJobs.remove(event.jobId).foreach { job =>
    +      job.status = event.jobResult match {
    +        case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +        case JobFailed(_) => JobExecutionStatus.FAILED
    +      }
    +
    +      job.completionTime = Some(new Date(event.time))
    +      update(job)
    +    }
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit 
= {
    +    val stage = getOrCreateStage(event.stageInfo)
    +    stage.status = v1.StageStatus.ACTIVE
    +
    +    // Look at all active jobs to find the ones that mention this stage.
    +    stage.jobs = liveJobs.values
    +      .filter(_.stageIds.contains(event.stageInfo.stageId))
    +      .toSeq
    +    stage.jobIds = stage.jobs.map(_.jobId).toSet
    +
    +    stage.jobs.foreach { job =>
    +      job.completedStages = job.completedStages - event.stageInfo.stageId
    +      job.activeStages += 1
    +      update(job)
    +    }
    +
    +    event.stageInfo.rddInfos.foreach { info =>
    +      if (info.storageLevel.isValid) {
    +        update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)))
    +      }
    +    }
    +
    +    update(stage)
    +  }
    +
    +  override def onTaskStart(event: SparkListenerTaskStart): Unit = {
    +    val task = new LiveTask(event.taskInfo, event.stageId, 
event.stageAttemptId)
    +    liveTasks.put(event.taskInfo.taskId, task)
    +    update(task)
    +
    +    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage 
=>
    +      stage.activeTasks += 1
    +      stage.firstLaunchTime = math.min(stage.firstLaunchTime, 
event.taskInfo.launchTime)
    +      update(stage)
    +
    +      stage.jobs.foreach { job =>
    +        job.activeTasks += 1
    +        update(job)
    +      }
    +    }
    +
    +    liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
    +      exec.activeTasks += 1
    +      exec.totalTasks += 1
    +      update(exec)
    +    }
    +  }
    +
    +  override def onTaskGettingResult(event: SparkListenerTaskGettingResult): 
Unit = {
    +    liveTasks.get(event.taskInfo.taskId).foreach { task =>
    +      update(task)
    --- End diff --
    
    Hmm, I thought I needed to handle this event later on, but looks like I 
don't, so it can go away.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to