tgravescs commented on a change in pull request #29906: URL: https://github.com/apache/spark/pull/29906#discussion_r510979634
########## File path: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ########## @@ -284,80 +284,127 @@ private[spark] class AppStatusListener( } override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = { - updateBlackListStatus(event.executorId, true) + updateExclusionStatus(event.executorId, true) + } + + override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit = { + updateExclusionStatus(event.executorId, true) } override def onExecutorBlacklistedForStage( event: SparkListenerExecutorBlacklistedForStage): Unit = { - val now = System.nanoTime() + updateExclusionStatusForStage(event.stageId, event.stageAttemptId, event.executorId) + } - Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => - setStageBlackListStatus(stage, now, event.executorId) - } - liveExecutors.get(event.executorId).foreach { exec => - addBlackListedStageTo(exec, event.stageId, now) - } + override def onExecutorExcludedForStage( + event: SparkListenerExecutorExcludedForStage): Unit = { + updateExclusionStatusForStage(event.stageId, event.stageAttemptId, event.executorId) } override def onNodeBlacklistedForStage(event: SparkListenerNodeBlacklistedForStage): Unit = { - val now = System.nanoTime() + updateNodeExclusionStatusForStage(event.stageId, event.stageAttemptId, event.hostId) + } - // Implicitly blacklist every available executor for the stage associated with this node - Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage => - val executorIds = liveExecutors.values.filter(_.host == event.hostId).map(_.executorId).toSeq - setStageBlackListStatus(stage, now, executorIds: _*) - } - liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec => - addBlackListedStageTo(exec, event.stageId, now) - } + override def onNodeExcludedForStage(event: SparkListenerNodeExcludedForStage): Unit = { + updateNodeExclusionStatusForStage(event.stageId, event.stageAttemptId, event.hostId) } - private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = { - exec.blacklistedInStages += stageId + private def addExcludedStageTo(exec: LiveExecutor, stageId: Int, now: Long): Unit = { + exec.excludedInStages += stageId liveUpdate(exec, now) } private def setStageBlackListStatus(stage: LiveStage, now: Long, executorIds: String*): Unit = { executorIds.foreach { executorId => val executorStageSummary = stage.executorSummary(executorId) - executorStageSummary.isBlacklisted = true + executorStageSummary.isExcluded = true + maybeUpdate(executorStageSummary, now) + } + stage.excludedExecutors ++= executorIds + maybeUpdate(stage, now) + } + + private def setStageExcludedStatus(stage: LiveStage, now: Long, executorIds: String*): Unit = { + executorIds.foreach { executorId => + val executorStageSummary = stage.executorSummary(executorId) + executorStageSummary.isExcluded = true maybeUpdate(executorStageSummary, now) } - stage.blackListedExecutors ++= executorIds + stage.excludedExecutors ++= executorIds maybeUpdate(stage, now) } override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = { - updateBlackListStatus(event.executorId, false) + updateExclusionStatus(event.executorId, false) + } + + override def onExecutorUnexcluded(event: SparkListenerExecutorUnexcluded): Unit = { + updateExclusionStatus(event.executorId, false) } override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = { - updateNodeBlackList(event.hostId, true) + updateNodeExcluded(event.hostId, true) + } + + override def onNodeExcluded(event: SparkListenerNodeExcluded): Unit = { + updateNodeExcluded(event.hostId, true) } override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = { - updateNodeBlackList(event.hostId, false) + updateNodeExcluded(event.hostId, false) + } + + override def onNodeUnexcluded(event: SparkListenerNodeUnexcluded): Unit = { + updateNodeExcluded(event.hostId, false) + } + + private def updateNodeExclusionStatusForStage(stageId: Int, stageAttemptId: Int, + hostId: String): Unit = { + val now = System.nanoTime() + + // Implicitly exclude every available executor for the stage associated with this node + Option(liveStages.get((stageId, stageAttemptId))).foreach { stage => + val executorIds = liveExecutors.values.filter(_.host == hostId).map(_.executorId).toSeq + setStageExcludedStatus(stage, now, executorIds: _*) + } + liveExecutors.values.filter(_.hostname == hostId).foreach { exec => + addExcludedStageTo(exec, stageId, now) + } + } + + private def updateExclusionStatusForStage(stageId: Int, stageAttemptId: Int, + execId: String): Unit = { + val now = System.nanoTime() + + Option(liveStages.get((stageId, stageAttemptId))).foreach { stage => + setStageExcludedStatus(stage, now, execId) + } + liveExecutors.get(execId).foreach { exec => + addExcludedStageTo(exec, stageId, now) + } } - private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { + private def updateExclusionStatus(execId: String, excluded: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => - exec.isBlacklisted = blacklisted - if (blacklisted) { + exec.isExcluded = excluded + if (excluded) { appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc()) + appStatusSource.foreach(_.EXCLUDED_EXECUTORS.inc()) Review comment: I updated this but I actually found a pre-existing bug where we weren't incrementing this when we excluded a node - which implicitly excludes the executors ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org