Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/42#discussion_r10596945
  
    --- Diff: 
core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---
    @@ -106,121 +114,154 @@ private[spark] class JobProgressListener(val sc: 
SparkContext) extends SparkList
         }
         description.map(d => stageIdToDescription(stage.stageId) = d)
     
    -    val stages = poolToActiveStages.getOrElseUpdate(poolName, new 
HashSet[StageInfo]())
    -    stages += stage
    +    val stages = poolToActiveStages.getOrElseUpdate(poolName, new 
HashMap[Int, StageInfo]())
    +    stages(stage.stageId) = stage
       }
     
       override def onTaskStart(taskStart: SparkListenerTaskStart) = 
synchronized {
    -    val sid = taskStart.task.stageId
    -    val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashSet[TaskInfo]())
    -    tasksActive += taskStart.taskInfo
    -    val taskList = stageIdToTaskInfos.getOrElse(
    -      sid, HashSet[(TaskInfo, Option[TaskMetrics], 
Option[ExceptionFailure])]())
    -    taskList += ((taskStart.taskInfo, None, None))
    -    stageIdToTaskInfos(sid) = taskList
    +    val sid = taskStart.stageId
    +    val taskInfo = taskStart.taskInfo
    +    if (taskInfo != null) {
    +      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashMap[Long, TaskInfo]())
    +      tasksActive(taskInfo.taskId) = taskInfo
    +      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, 
TaskUIData]())
    +      taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
    +      stageIdToTaskData(sid) = taskMap
    +    }
       }
     
    -  override def onTaskGettingResult(taskGettingResult: 
SparkListenerTaskGettingResult)
    -      = synchronized {
    +  override def onTaskGettingResult(taskGettingResult: 
SparkListenerTaskGettingResult) {
         // Do nothing: because we don't do a deep copy of the TaskInfo, the 
TaskInfo in
         // stageToTaskInfos already has the updated status.
       }
     
       override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
    -    val sid = taskEnd.task.stageId
    -
    -    // create executor summary map if necessary
    -    val executorSummaryMap = 
stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
    -      op = new HashMap[String, ExecutorSummary]())
    -    executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId,
    -      op = new ExecutorSummary())
    -
    -    val executorSummary = 
executorSummaryMap.get(taskEnd.taskInfo.executorId)
    -    executorSummary match {
    -      case Some(y) => {
    -        // first update failed-task, succeed-task
    +    val sid = taskEnd.stageId
    +    val info = taskEnd.taskInfo
    +
    +    if (info != null) {
    +      // create executor summary map if necessary
    +      val executorSummaryMap = 
stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
    +        op = new HashMap[String, ExecutorSummary]())
    +      executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new 
ExecutorSummary)
    +
    +      val executorSummary = executorSummaryMap.get(info.executorId)
    +      executorSummary match {
    +        case Some(y) => {
    +          // first update failed-task, succeed-task
    +          taskEnd.reason match {
    +            case Success =>
    +              y.succeededTasks += 1
    +            case _ =>
    +              y.failedTasks += 1
    +          }
    +
    +          // update duration
    +          y.taskTime += info.duration
    +
    +          val metrics = taskEnd.taskMetrics
    +          if (metrics != null) {
    +            metrics.shuffleReadMetrics.foreach { y.shuffleRead += 
_.remoteBytesRead }
    +            metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += 
_.shuffleBytesWritten }
    +            y.memoryBytesSpilled += metrics.memoryBytesSpilled
    +            y.diskBytesSpilled += metrics.diskBytesSpilled
    +          }
    +        }
    +        case _ => {}
    +      }
    +
    +      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashMap[Long, TaskInfo]())
    +      // Remove by taskId, rather than by TaskInfo, in case the TaskInfo 
is from storage
    +      tasksActive.remove(info.taskId)
    +
    +      val (failureInfo, metrics): (Option[ExceptionFailure], 
Option[TaskMetrics]) =
             taskEnd.reason match {
    -          case Success =>
    -            y.succeededTasks += 1
    +          case e: ExceptionFailure =>
    +            stageIdToTasksFailed(sid) = 
stageIdToTasksFailed.getOrElse(sid, 0) + 1
    +            (Some(e), e.metrics)
               case _ =>
    -            y.failedTasks += 1
    +            stageIdToTasksComplete(sid) = 
stageIdToTasksComplete.getOrElse(sid, 0) + 1
    +            (None, Option(taskEnd.taskMetrics))
             }
     
    -        // update duration
    -        y.taskTime += taskEnd.taskInfo.duration
    +      stageIdToTime.getOrElseUpdate(sid, 0L)
    +      val time = metrics.map(_.executorRunTime).getOrElse(0L)
    +      stageIdToTime(sid) += time
    +      totalTime += time
     
    -        Option(taskEnd.taskMetrics).foreach { taskMetrics =>
    -          taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += 
_.remoteBytesRead }
    -          taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += 
_.shuffleBytesWritten }
    -          y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled
    -          y.diskBytesSpilled += taskMetrics.diskBytesSpilled
    -        }
    -      }
    -      case _ => {}
    -    }
    +      stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
    +      val shuffleRead = 
metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
    +      stageIdToShuffleRead(sid) += shuffleRead
    +      totalShuffleRead += shuffleRead
     
    -    val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new 
HashSet[TaskInfo]())
    -    tasksActive -= taskEnd.taskInfo
    -
    -    val (failureInfo, metrics): (Option[ExceptionFailure], 
Option[TaskMetrics]) =
    -      taskEnd.reason match {
    -        case e: ExceptionFailure =>
    -          stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 
0) + 1
    -          (Some(e), e.metrics)
    -        case _ =>
    -          stageIdToTasksComplete(sid) = 
stageIdToTasksComplete.getOrElse(sid, 0) + 1
    -          (None, Option(taskEnd.taskMetrics))
    -      }
    +      stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
    +      val shuffleWrite =
    +        
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
    +      stageIdToShuffleWrite(sid) += shuffleWrite
    +      totalShuffleWrite += shuffleWrite
     
    -    stageIdToTime.getOrElseUpdate(sid, 0L)
    -    val time = metrics.map(m => m.executorRunTime).getOrElse(0)
    -    stageIdToTime(sid) += time
    -    totalTime += time
    -
    -    stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
    -    val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
    -      s.remoteBytesRead).getOrElse(0L)
    -    stageIdToShuffleRead(sid) += shuffleRead
    -    totalShuffleRead += shuffleRead
    -
    -    stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
    -    val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
    -      s.shuffleBytesWritten).getOrElse(0L)
    -    stageIdToShuffleWrite(sid) += shuffleWrite
    -    totalShuffleWrite += shuffleWrite
    -
    -    stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
    -    val memoryBytesSpilled = metrics.map(m => 
m.memoryBytesSpilled).getOrElse(0L)
    -    stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
    -
    -    stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
    -    val diskBytesSpilled = metrics.map(m => 
m.diskBytesSpilled).getOrElse(0L)
    -    stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
    -
    -    val taskList = stageIdToTaskInfos.getOrElse(
    -      sid, HashSet[(TaskInfo, Option[TaskMetrics], 
Option[ExceptionFailure])]())
    -    taskList -= ((taskEnd.taskInfo, None, None))
    -    taskList += ((taskEnd.taskInfo, metrics, failureInfo))
    -    stageIdToTaskInfos(sid) = taskList
    +      stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
    +      val memoryBytesSpilled = 
metrics.map(_.memoryBytesSpilled).getOrElse(0L)
    +      stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
    +
    +      stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
    +      val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
    +      stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
    +
    +      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, 
TaskUIData]())
    +      taskMap(info.taskId) = new TaskUIData(info, metrics, failureInfo)
    +      stageIdToTaskData(sid) = taskMap
    +    }
       }
     
       override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
    -    jobEnd match {
    -      case end: SparkListenerJobEnd =>
    -        end.jobResult match {
    -          case JobFailed(ex, Some(stage)) =>
    -            /* If two jobs share a stage we could get this failure message 
twice. So we first
    -            *  check whether we've already retired this stage. */
    -            val stageInfo = activeStages.filter(s => s.stageId == 
stage.id).headOption
    -            stageInfo.foreach {s =>
    -              activeStages -= s
    -              poolToActiveStages(stageIdToPool(stage.id)) -= s
    -              failedStages += s
    -              trimIfNecessary(failedStages)
    -            }
    -          case _ =>
    +    jobEnd.jobResult match {
    +      case JobFailed(_, stageId) =>
    +        activeStages.get(stageId).foreach { s =>
    +          // Remove by stageId, rather than by StageInfo, in case the 
StageInfo is from storage
    +          activeStages.remove(s.stageId)
    +          poolToActiveStages(stageIdToPool(stageId)).remove(s.stageId)
    +          failedStages += s
    +          trimIfNecessary(failedStages)
             }
           case _ =>
         }
       }
    +
    +  override def onEnvironmentUpdate(environmentUpdate: 
SparkListenerEnvironmentUpdate) {
    +    synchronized {
    +      val schedulingModeName =
    +        environmentUpdate.environmentDetails("Spark 
Properties").toMap.get("spark.scheduler.mode")
    +      schedulingMode = schedulingModeName match {
    +        case Some(name) => Some(SchedulingMode.withName(name))
    +        case None => None
    +      }
    +    }
    +  }
    +
    +  override def onBlockManagerGained(blockManagerGained: 
SparkListenerBlockManagerGained) {
    --- End diff --
    
    Elsewhere in the code we tend to use `Added` for this type of thing. I.e. 
`onBlockManagerAdded`. That sounds a bit more natural to me then gained.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to