Repository: spark Updated Branches: refs/heads/branch-2.4 3d2fce5a3 -> 73894462c
[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when cleaning up stages ## What changes were proposed in this pull request? * Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage. * This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly. Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data. ## How was this patch tested? Using existing tests in AppStatusListenerSuite This is my original work and I license the work to the project under the projectâs open source license. Closes #22883 from patrickbrownsync/cleanup-stages-fix. Authored-by: Patrick Brown <patrick.br...@blyncsy.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> (cherry picked from commit e9d3ca0b7993995f24f5c555a570bc2521119e12) Signed-off-by: Marcelo Vanzin <van...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73894462 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73894462 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73894462 Branch: refs/heads/branch-2.4 Commit: 73894462cfb80b7c3e61c743b5a2f3be5d2282dd Parents: 3d2fce5 Author: Patrick Brown <patrick.br...@blyncsy.com> Authored: Thu Nov 1 09:34:29 2018 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Nov 1 09:34:45 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/status/AppStatusListener.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73894462/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 513c929..fdbef6f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1002,16 +1002,6 @@ private[spark] class AppStatusListener( kvstore.delete(e.getClass(), e.id) } - val tasks = kvstore.view(classOf[TaskDataWrapper]) - .index("stage") - .first(key) - .last(key) - .asScala - - tasks.foreach { t => - kvstore.delete(t.getClass(), t.taskId) - } - // Check whether there are remaining attempts for the same stage. If there aren't, then // also delete the RDD graph data. val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) @@ -1034,6 +1024,15 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) } + + // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow + val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala + val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet + tasks.foreach { t => + if (keys.contains((t.stageId, t.stageAttemptId))) { + kvstore.delete(t.getClass(), t.taskId) + } + } } private def cleanupTasks(stage: LiveStage): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org