Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19383#discussion_r142207345
  
    --- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.status
    +
    +import java.util.Date
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.TaskMetrics
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.status.api.v1
    +import org.apache.spark.storage._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.ui.scope._
    +import org.apache.spark.util.kvstore.KVStore
    +
    +/**
    + * A Spark listener that writes application information to a data store. 
The types written to the
    + * store are defined in the `storeTypes.scala` file and are based on the 
public REST API.
    + */
    +private class AppStatusListener(kvstore: KVStore) extends SparkListener 
with Logging {
    +
    +  private var sparkVersion = SPARK_VERSION
    +  private var appInfo: v1.ApplicationInfo = null
    +  private var coresPerTask: Int = 1
    +
    +  // Keep track of live entities, so that task metrics can be efficiently 
updated (without
    +  // causing too many writes to the underlying store, and other expensive 
operations).
    +  private val liveStages = new HashMap[(Int, Int), LiveStage]()
    +  private val liveJobs = new HashMap[Int, LiveJob]()
    +  private val liveExecutors = new HashMap[String, LiveExecutor]()
    +  private val liveTasks = new HashMap[Long, LiveTask]()
    +  private val liveRDDs = new HashMap[Int, LiveRDD]()
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case SparkListenerLogStart(version) => sparkVersion = version
    +    case _ =>
    +  }
    +
    +  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
    +    assert(event.appId.isDefined, "Application without IDs are not 
supported.")
    +
    +    val attempt = new v1.ApplicationAttemptInfo(
    +      event.appAttemptId,
    +      new Date(event.time),
    +      new Date(-1),
    +      new Date(event.time),
    +      -1L,
    +      event.sparkUser,
    +      false,
    +      sparkVersion)
    +
    +    appInfo = new v1.ApplicationInfo(
    +      event.appId.get,
    +      event.appName,
    +      None,
    +      None,
    +      None,
    +      None,
    +      Seq(attempt))
    +
    +    kvstore.write(new ApplicationInfoWrapper(appInfo))
    +  }
    +
    +  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit 
= {
    +    val old = appInfo.attempts.head
    +    val attempt = new v1.ApplicationAttemptInfo(
    +      old.attemptId,
    +      old.startTime,
    +      new Date(event.time),
    +      new Date(event.time),
    +      event.time - old.startTime.getTime(),
    +      old.sparkUser,
    +      true,
    +      old.appSparkVersion)
    +
    +    appInfo = new v1.ApplicationInfo(
    +      appInfo.id,
    +      appInfo.name,
    +      None,
    +      None,
    +      None,
    +      None,
    +      Seq(attempt))
    +    kvstore.write(new ApplicationInfoWrapper(appInfo))
    +  }
    +
    +  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
    +    // This needs to be an update in case an executor re-registers after 
the driver has
    +    // marked it as "dead".
    +    val exec = getOrCreateExecutor(event.executorId)
    +    exec.host = event.executorInfo.executorHost
    +    exec.isActive = true
    +    exec.totalCores = event.executorInfo.totalCores
    +    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
    +    exec.executorLogs = event.executorInfo.logUrlMap
    +    update(exec)
    +  }
    +
    +  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): 
Unit = {
    +    liveExecutors.remove(event.executorId).foreach { exec =>
    +      exec.isActive = false
    +      update(exec)
    +    }
    +  }
    +
    +  override def onExecutorBlacklisted(event: 
SparkListenerExecutorBlacklisted): Unit = {
    +    updateBlackListStatus(event.executorId, true)
    +  }
    +
    +  override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
    +    updateBlackListStatus(event.executorId, false)
    +  }
    +
    +  override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): 
Unit = {
    +    updateNodeBlackList(event.hostId, true)
    +  }
    +
    +  override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): 
Unit = {
    +    updateNodeBlackList(event.hostId, false)
    +  }
    +
    +  private def updateBlackListStatus(execId: String, blacklisted: Boolean): 
Unit = {
    +    liveExecutors.get(execId).foreach { exec =>
    +      exec.isBlacklisted = blacklisted
    +      update(exec)
    +    }
    +  }
    +
    +  private def updateNodeBlackList(host: String, blacklisted: Boolean): 
Unit = {
    +    // Implicitly (un)blacklist every executor associated with the node.
    +    liveExecutors.values.foreach { exec =>
    +      if (exec.hostname == host) {
    +        exec.isBlacklisted = blacklisted
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    // Compute (a potential underestimate of) the number of tasks that 
will be run by this job.
    +    // This may be an underestimate because the job start event references 
all of the result
    +    // stages' transitive stage dependencies, but some of these stages 
might be skipped if their
    +    // output is available from earlier runs.
    +    // See https://github.com/apache/spark/pull/3009 for a more extensive 
discussion.
    +    val numTasks = {
    +      val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
    +      missingStages.map(_.numTasks).sum
    +    }
    +
    +    val lastStageInfo = event.stageInfos.lastOption
    +    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown 
Stage Name)")
    +
    +    val jobGroup = Option(event.properties)
    +      .flatMap { p => 
Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
    +
    +    val job = new LiveJob(
    +      event.jobId,
    +      lastStageName,
    +      Option(event.time).filter(_ >= 0).map(new Date(_)),
    --- End diff --
    
    Yes, this was just copied over.
    
    Since it seems we only support logs back to 1.3 nowadays (SPARK-2261), 
guess we can just use the time directly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to