This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 844f10c  [SPARK-35391] Fix memory leak in ExecutorAllocationListener
844f10c is described below

commit 844f10c7426a76fb29ee91223c8af43825e147c5
Author: Vasily Kolpakov <vasilykolpa...@gmail.com>
AuthorDate: Mon Jun 21 08:23:20 2021 -0500

    [SPARK-35391] Fix memory leak in ExecutorAllocationListener
    
    ### What changes were proposed in this pull request?
    This PR fixes a memory leak in ExecutorAllocationListener.
    
    ### Why are the changes needed?
    Dynamic allocation stops working under high load (~100 tasks/s, ~5 
stages/s) in long-lived (~10 days) spark applications. This PR addresses the 
problem.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual tests. The patch fixed dynamic allocation in production cluster.
    
    Closes #32526 from 
VasilyKolpakov/SPARK-35391_fix_ExecutorAllocationListener.
    
    Authored-by: Vasily Kolpakov <vasilykolpa...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/ExecutorAllocationManager.scala   | 40 ++++++++++++++--------
 1 file changed, 25 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 779559b..c4b6193 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -737,6 +737,7 @@ private[spark] class ExecutorAllocationManager(
         stageAttemptToTaskIndices -= stageAttempt
         stageAttemptToSpeculativeTaskIndices -= stageAttempt
         stageAttemptToExecutorPlacementHints -= stageAttempt
+        removeStageFromResourceProfileIfUnused(stageAttempt)
 
         // Update the executor placement hints
         updateExecutorPlacementHints()
@@ -781,20 +782,7 @@ private[spark] class ExecutorAllocationManager(
           stageAttemptToNumRunningTask(stageAttempt) -= 1
           if (stageAttemptToNumRunningTask(stageAttempt) == 0) {
             stageAttemptToNumRunningTask -= stageAttempt
-            if (!stageAttemptToNumTasks.contains(stageAttempt)) {
-              val rpForStage = resourceProfileIdToStageAttempt.filter { case 
(k, v) =>
-                v.contains(stageAttempt)
-              }.keys
-              if (rpForStage.size == 1) {
-                // be careful about the removal from here due to late tasks, 
make sure stage is
-                // really complete and no tasks left
-                resourceProfileIdToStageAttempt(rpForStage.head) -= 
stageAttempt
-              } else {
-                logWarning(s"Should have exactly one resource profile for 
stage $stageAttempt," +
-                  s" but have $rpForStage")
-              }
-            }
-
+            removeStageFromResourceProfileIfUnused(stageAttempt)
           }
         }
         if (taskEnd.taskInfo.speculative) {
@@ -859,6 +847,28 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         // Clear unschedulableTaskSets since atleast one task becomes 
schedulable now
         unschedulableTaskSets.remove(stageAttempt)
+        removeStageFromResourceProfileIfUnused(stageAttempt)
+      }
+    }
+
+    def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): 
Unit = {
+      if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
+          !stageAttemptToNumTasks.contains(stageAttempt) &&
+          !stageAttemptToNumSpeculativeTasks.contains(stageAttempt) &&
+          !stageAttemptToTaskIndices.contains(stageAttempt) &&
+          !stageAttemptToSpeculativeTaskIndices.contains(stageAttempt)
+      ) {
+        val rpForStage = resourceProfileIdToStageAttempt.filter { case (k, v) 
=>
+          v.contains(stageAttempt)
+        }.keys
+        if (rpForStage.size == 1) {
+          // be careful about the removal from here due to late tasks, make 
sure stage is
+          // really complete and no tasks left
+          resourceProfileIdToStageAttempt(rpForStage.head) -= stageAttempt
+        } else {
+          logWarning(s"Should have exactly one resource profile for stage 
$stageAttempt," +
+              s" but have $rpForStage")
+        }
       }
     }
 
@@ -920,7 +930,7 @@ private[spark] class ExecutorAllocationManager(
       val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, 
Set.empty).toSeq
       // attempts is a Set, change to Seq so we keep all values
       attempts.map { attempt =>
-        stageAttemptToNumRunningTask.getOrElseUpdate(attempt, 0)
+        stageAttemptToNumRunningTask.getOrElse(attempt, 0)
       }.sum
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to