Repository: spark
Updated Branches:
  refs/heads/branch-2.4 3d2fce5a3 -> 73894462c


[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when cleaning 
up stages

## What changes were proposed in this pull request?

* Update `AppStatusListener` `cleanupStages` method to remove tasks for those 
stages in a single pass instead of 1 for each stage.
* This fixes an issue where the cleanupStages method would get backed up, 
causing a backup in the executor in ElementTrackingStore, resulting in stages 
and jobs not getting cleaned up properly.

Tasks seem most susceptible to this as there are a lot of them, however a 
similar issue could arise in other locations the `KVStore` `view` method is 
used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` 
as it appears this interface and implementation can lead to multiple and 
inefficient traversals of the stored data.

## How was this patch tested?

Using existing tests in AppStatusListenerSuite

This is my original work and I license the work to the project under the 
project’s open source license.

Closes #22883 from patrickbrownsync/cleanup-stages-fix.

Authored-by: Patrick Brown <patrick.br...@blyncsy.com>
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
(cherry picked from commit e9d3ca0b7993995f24f5c555a570bc2521119e12)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73894462
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73894462
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73894462

Branch: refs/heads/branch-2.4
Commit: 73894462cfb80b7c3e61c743b5a2f3be5d2282dd
Parents: 3d2fce5
Author: Patrick Brown <patrick.br...@blyncsy.com>
Authored: Thu Nov 1 09:34:29 2018 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Nov 1 09:34:45 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala  | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73894462/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 513c929..fdbef6f 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -1002,16 +1002,6 @@ private[spark] class AppStatusListener(
         kvstore.delete(e.getClass(), e.id)
       }
 
-      val tasks = kvstore.view(classOf[TaskDataWrapper])
-        .index("stage")
-        .first(key)
-        .last(key)
-        .asScala
-
-      tasks.foreach { t =>
-        kvstore.delete(t.getClass(), t.taskId)
-      }
-
       // Check whether there are remaining attempts for the same stage. If 
there aren't, then
       // also delete the RDD graph data.
       val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
@@ -1034,6 +1024,15 @@ private[spark] class AppStatusListener(
 
       cleanupCachedQuantiles(key)
     }
+
+    // Delete tasks for all stages in one pass, as deleting them for each 
stage individually is slow
+    val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala
+    val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet
+    tasks.foreach { t =>
+      if (keys.contains((t.stageId, t.stageAttemptId))) {
+        kvstore.delete(t.getClass(), t.taskId)
+      }
+    }
   }
 
   private def cleanupTasks(stage: LiveStage): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to