Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/42#discussion_r10596945 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala --- @@ -106,121 +114,154 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList } description.map(d => stageIdToDescription(stage.stageId) = d) - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[StageInfo]()) - stages += stage + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]()) + stages(stage.stageId) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val sid = taskStart.task.stageId - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive += taskStart.taskInfo - val taskList = stageIdToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskStart.taskInfo, None, None)) - stageIdToTaskInfos(sid) = taskList + val sid = taskStart.stageId + val taskInfo = taskStart.taskInfo + if (taskInfo != null) { + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) + tasksActive(taskInfo.taskId) = taskInfo + val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) + taskMap(taskInfo.taskId) = new TaskUIData(taskInfo) + stageIdToTaskData(sid) = taskMap + } } - override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) - = synchronized { + override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { // Do nothing: because we don't do a deep copy of the TaskInfo, the TaskInfo in // stageToTaskInfos already has the updated status. } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val sid = taskEnd.task.stageId - - // create executor summary map if necessary - val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, - op = new HashMap[String, ExecutorSummary]()) - executorSummaryMap.getOrElseUpdate(key = taskEnd.taskInfo.executorId, - op = new ExecutorSummary()) - - val executorSummary = executorSummaryMap.get(taskEnd.taskInfo.executorId) - executorSummary match { - case Some(y) => { - // first update failed-task, succeed-task + val sid = taskEnd.stageId + val info = taskEnd.taskInfo + + if (info != null) { + // create executor summary map if necessary + val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, + op = new HashMap[String, ExecutorSummary]()) + executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary) + + val executorSummary = executorSummaryMap.get(info.executorId) + executorSummary match { + case Some(y) => { + // first update failed-task, succeed-task + taskEnd.reason match { + case Success => + y.succeededTasks += 1 + case _ => + y.failedTasks += 1 + } + + // update duration + y.taskTime += info.duration + + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } + metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.memoryBytesSpilled += metrics.memoryBytesSpilled + y.diskBytesSpilled += metrics.diskBytesSpilled + } + } + case _ => {} + } + + val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) + // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage + tasksActive.remove(info.taskId) + + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match { - case Success => - y.succeededTasks += 1 + case e: ExceptionFailure => + stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + (Some(e), e.metrics) case _ => - y.failedTasks += 1 + stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 + (None, Option(taskEnd.taskMetrics)) } - // update duration - y.taskTime += taskEnd.taskInfo.duration + stageIdToTime.getOrElseUpdate(sid, 0L) + val time = metrics.map(_.executorRunTime).getOrElse(0L) + stageIdToTime(sid) += time + totalTime += time - Option(taskEnd.taskMetrics).foreach { taskMetrics => - taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } - taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } - y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled - y.diskBytesSpilled += taskMetrics.diskBytesSpilled - } - } - case _ => {} - } + stageIdToShuffleRead.getOrElseUpdate(sid, 0L) + val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) + stageIdToShuffleRead(sid) += shuffleRead + totalShuffleRead += shuffleRead - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) - tasksActive -= taskEnd.taskInfo - - val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = - taskEnd.reason match { - case e: ExceptionFailure => - stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics) - case _ => - stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 - (None, Option(taskEnd.taskMetrics)) - } + stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) + val shuffleWrite = + metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) + stageIdToShuffleWrite(sid) += shuffleWrite + totalShuffleWrite += shuffleWrite - stageIdToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(m => m.executorRunTime).getOrElse(0) - stageIdToTime(sid) += time - totalTime += time - - stageIdToShuffleRead.getOrElseUpdate(sid, 0L) - val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => - s.remoteBytesRead).getOrElse(0L) - stageIdToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead - - stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) - val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => - s.shuffleBytesWritten).getOrElse(0L) - stageIdToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite - - stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) - val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L) - stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled - - stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) - val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L) - stageIdToDiskBytesSpilled(sid) += diskBytesSpilled - - val taskList = stageIdToTaskInfos.getOrElse( - sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList -= ((taskEnd.taskInfo, None, None)) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - stageIdToTaskInfos(sid) = taskList + stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) + val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) + stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled + + stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) + val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L) + stageIdToDiskBytesSpilled(sid) += diskBytesSpilled + + val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) + taskMap(info.taskId) = new TaskUIData(info, metrics, failureInfo) + stageIdToTaskData(sid) = taskMap + } } override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { - jobEnd match { - case end: SparkListenerJobEnd => - end.jobResult match { - case JobFailed(ex, Some(stage)) => - /* If two jobs share a stage we could get this failure message twice. So we first - * check whether we've already retired this stage. */ - val stageInfo = activeStages.filter(s => s.stageId == stage.id).headOption - stageInfo.foreach {s => - activeStages -= s - poolToActiveStages(stageIdToPool(stage.id)) -= s - failedStages += s - trimIfNecessary(failedStages) - } - case _ => + jobEnd.jobResult match { + case JobFailed(_, stageId) => + activeStages.get(stageId).foreach { s => + // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage + activeStages.remove(s.stageId) + poolToActiveStages(stageIdToPool(stageId)).remove(s.stageId) + failedStages += s + trimIfNecessary(failedStages) } case _ => } } + + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { + synchronized { + val schedulingModeName = + environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") + schedulingMode = schedulingModeName match { + case Some(name) => Some(SchedulingMode.withName(name)) + case None => None + } + } + } + + override def onBlockManagerGained(blockManagerGained: SparkListenerBlockManagerGained) { --- End diff -- Elsewhere in the code we tend to use `Added` for this type of thing. I.e. `onBlockManagerAdded`. That sounds a bit more natural to me then gained.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---