Github user ajbozarth commented on a diff in the pull request: https://github.com/apache/spark/pull/19750#discussion_r151281491 --- Diff: core/src/main/scala/org/apache/spark/SparkStatusTracker.scala --- @@ -89,39 +83,33 @@ class SparkStatusTracker private[spark] (sc: SparkContext) { * garbage collected. */ def getStageInfo(stageId: Int): Option[SparkStageInfo] = { - jobProgressListener.synchronized { - for ( - info <- jobProgressListener.stageIdToInfo.get(stageId); - data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) - ) yield { - new SparkStageInfoImpl( - stageId, - info.attemptId, - info.submissionTime.getOrElse(0), - info.name, - info.numTasks, - data.numActiveTasks, - data.numCompleteTasks, - data.numFailedTasks) - } + store.asOption(store.lastStageAttempt(stageId)).map { stage => + new SparkStageInfoImpl( + stageId, + stage.attemptId, + stage.submissionTime.map(_.getTime()).getOrElse(0L), + stage.name, + stage.numTasks, + stage.numActiveTasks, + stage.numCompleteTasks, + stage.numFailedTasks) } } /** * Returns information of all known executors, including host, port, cacheSize, numRunningTasks. */ def getExecutorInfos: Array[SparkExecutorInfo] = { - val executorIdToRunningTasks: Map[String, Int] = - sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors - - sc.getExecutorStorageStatus.map { status => - val bmId = status.blockManagerId + store.executorList(true).map { exec => + val (host, port) = exec.hostPort.split(":", 2) match { + case Array(h, p) => (h, p.toInt) + case Array(h) => (h, -1) + } new SparkExecutorInfoImpl( - bmId.host, - bmId.port, - status.cacheSize, - executorIdToRunningTasks.getOrElse(bmId.executorId, 0) - ) - } + host, + port, + exec.maxMemory, --- End diff -- Why this change from cacheSize to maxMemory, are they synonymous?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org