This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new a415df3 [SPARK-35391] Fix memory leak in ExecutorAllocationListener a415df3 is described below commit a415df341a3d5f4e895c0ca186dd4ffeccc777da Author: Vasily Kolpakov <vasilykolpa...@gmail.com> AuthorDate: Mon Jun 21 08:23:20 2021 -0500 [SPARK-35391] Fix memory leak in ExecutorAllocationListener ### What changes were proposed in this pull request? This PR fixes a memory leak in ExecutorAllocationListener. ### Why are the changes needed? Dynamic allocation stops working under high load (~100 tasks/s, ~5 stages/s) in long-lived (~10 days) spark applications. This PR addresses the problem. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual tests. The patch fixed dynamic allocation in production cluster. Closes #32526 from VasilyKolpakov/SPARK-35391_fix_ExecutorAllocationListener. Authored-by: Vasily Kolpakov <vasilykolpa...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> (cherry picked from commit 844f10c7426a76fb29ee91223c8af43825e147c5) Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../apache/spark/ExecutorAllocationManager.scala | 40 ++++++++++++++-------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bdb768e..f2078f4 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -736,6 +736,7 @@ private[spark] class ExecutorAllocationManager( stageAttemptToTaskIndices -= stageAttempt stageAttemptToSpeculativeTaskIndices -= stageAttempt stageAttemptToExecutorPlacementHints -= stageAttempt + removeStageFromResourceProfileIfUnused(stageAttempt) // Update the executor placement hints updateExecutorPlacementHints() @@ -780,20 +781,7 @@ private[spark] class ExecutorAllocationManager( stageAttemptToNumRunningTask(stageAttempt) -= 1 if (stageAttemptToNumRunningTask(stageAttempt) == 0) { stageAttemptToNumRunningTask -= stageAttempt - if (!stageAttemptToNumTasks.contains(stageAttempt)) { - val rpForStage = resourceProfileIdToStageAttempt.filter { case (k, v) => - v.contains(stageAttempt) - }.keys - if (rpForStage.size == 1) { - // be careful about the removal from here due to late tasks, make sure stage is - // really complete and no tasks left - resourceProfileIdToStageAttempt(rpForStage.head) -= stageAttempt - } else { - logWarning(s"Should have exactly one resource profile for stage $stageAttempt," + - s" but have $rpForStage") - } - } - + removeStageFromResourceProfileIfUnused(stageAttempt) } } if (taskEnd.taskInfo.speculative) { @@ -858,6 +846,28 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { // Clear unschedulableTaskSets since atleast one task becomes schedulable now unschedulableTaskSets.remove(stageAttempt) + removeStageFromResourceProfileIfUnused(stageAttempt) + } + } + + def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): Unit = { + if (!stageAttemptToNumRunningTask.contains(stageAttempt) && + !stageAttemptToNumTasks.contains(stageAttempt) && + !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) && + !stageAttemptToTaskIndices.contains(stageAttempt) && + !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt) + ) { + val rpForStage = resourceProfileIdToStageAttempt.filter { case (k, v) => + v.contains(stageAttempt) + }.keys + if (rpForStage.size == 1) { + // be careful about the removal from here due to late tasks, make sure stage is + // really complete and no tasks left + resourceProfileIdToStageAttempt(rpForStage.head) -= stageAttempt + } else { + logWarning(s"Should have exactly one resource profile for stage $stageAttempt," + + s" but have $rpForStage") + } } } @@ -931,7 +941,7 @@ private[spark] class ExecutorAllocationManager( val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq // attempts is a Set, change to Seq so we keep all values attempts.map { attempt => - stageAttemptToNumRunningTask.getOrElseUpdate(attempt, 0) + stageAttemptToNumRunningTask.getOrElse(attempt, 0) }.sum } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org