Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r156718870 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -772,4 +813,118 @@ private[spark] class AppStatusListener( } } + private def cleanupExecutors(count: Long): Unit = { + // Because the limit is on the number of *dead* executors, we need to calculate whether + // there are actually enough dead executors to be deleted. + val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS) + val dead = count - activeExecutorCount + + if (dead > threshold) { + val countToDelete = calculateNumberToRemove(dead, threshold) + val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active") + .max(countToDelete).first(false).last(false).asScala.toSeq + toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) } + } + } + + private def cleanupJobs(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS)) + if (countToDelete <= 0L) { + return + } + + val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]), + countToDelete.toInt) { j => + j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN + } + toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) } + } + + private def cleanupStages(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES)) + if (countToDelete <= 0L) { + return + } + + val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]), + countToDelete.toInt) { s => + s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING + } + + stages.foreach { s => + val key = s.id + kvstore.delete(s.getClass(), key) + + val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper]) + .index("stage") + .first(key) + .last(key) + .asScala + .toSeq + execSummaries.foreach { e => + kvstore.delete(e.getClass(), e.id) + } + + val tasks = kvstore.view(classOf[TaskDataWrapper]) + .index("stage") + .first(key) + .last(key) + .asScala + + tasks.foreach { t => + kvstore.delete(t.getClass(), t.info.taskId) + } + + // Check whether there are remaining attempts for the same stage. If there aren't, then + // also delete the RDD graph data. + val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) + .index("stageId") + .first(s.stageId) + .last(s.stageId) + .closeableIterator() + + val hasMoreAttempts = try { + remainingAttempts.asScala.exists { other => + other.info.attemptId != s.info.attemptId + } + } finally { + remainingAttempts.close() + } + + if (!hasMoreAttempts) { + kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId) + } + } + } + + private def cleanupTasks(stage: LiveStage): Unit = { + val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage) + if (countToDelete > 0L) { + val stageKey = Array(stage.info.stageId, stage.info.attemptId) + val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey) + .last(stageKey) + + // On live applications, try to delete finished tasks only; when in the SHS, treat all + // tasks as the same. + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { t => + !live || t.info.status != TaskState.RUNNING.toString() --- End diff -- in the SHS, wouldn't you still prefer to delete finished tasks over live ones? in all cases, should you really just try to delete finished tasks first, but still delete running tasks if need be? I don't see any filter like this in the old code.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org