Repository: spark
Updated Branches:
  refs/heads/master c3002c4a6 -> 04d462f64


[SPARK-4495] Fix memory leak in JobProgressListener

This commit fixes a memory leak in JobProgressListener that I introduced in 
SPARK-2321 and adds a testing framework to ensure that it’s very difficult to 
inadvertently introduce new memory leaks.

This solution might be overkill, but the main idea is to partition 
JobProgressListener's state into three buckets: collections that should be 
empty once Spark is idle, collections that must obey some hard size limit, and 
collections that have a soft size limit (they can grow arbitrarily large when 
Spark is active but must shrink to fit within some bound after Spark becomes 
idle).

Based on this, we can write fairly generic tests that run workloads that submit 
more than `spark.ui.retainedStages` stages and `spark.ui.retainedJobs` jobs 
then check that these various collections' sizes obey their contracts.

Author: Josh Rosen <joshro...@databricks.com>

Closes #3372 from JoshRosen/SPARK-4495 and squashes the following commits:

c73fab5 [Josh Rosen] "data structures" -> collections
be72e81 [Josh Rosen] [SPARK-4495] Fix memory leaks in JobProgressListener


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04d462f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04d462f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04d462f6

Branch: refs/heads/master
Commit: 04d462f648aba7b18fc293b7189b86af70e421bc
Parents: c3002c4
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed Nov 19 16:50:21 2014 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Wed Nov 19 16:50:21 2014 -0800

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     | 113 ++++++++++++++-----
 .../ui/jobs/JobProgressListenerSuite.scala      | 100 +++++++++++++---
 2 files changed, 170 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04d462f6/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 8bbde51..ccdcf0e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -40,41 +40,108 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
 
   import JobProgressListener._
 
+  // Define a handful of type aliases so that data structures' types can serve 
as documentation.
+  // These type aliases are public because they're used in the types of public 
fields:
+
   type JobId = Int
   type StageId = Int
   type StageAttemptId = Int
+  type PoolName = String
+  type ExecutorId = String
 
-  // How many stages to remember
-  val retainedStages = conf.getInt("spark.ui.retainedStages", 
DEFAULT_RETAINED_STAGES)
-  // How many jobs to remember
-  val retailedJobs = conf.getInt("spark.ui.retainedJobs", 
DEFAULT_RETAINED_JOBS)
+  // Define all of our state:
 
+  // Jobs:
   val activeJobs = new HashMap[JobId, JobUIData]
   val completedJobs = ListBuffer[JobUIData]()
   val failedJobs = ListBuffer[JobUIData]()
   val jobIdToData = new HashMap[JobId, JobUIData]
 
+  // Stages:
   val activeStages = new HashMap[StageId, StageInfo]
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
   val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
   val stageIdToInfo = new HashMap[StageId, StageInfo]
-  
-  // Number of completed and failed stages, may not actually equal to 
completedStages.size and 
-  // failedStages.size respectively due to completedStage and failedStages 
only maintain the latest
-  // part of the stages, the earlier ones will be removed when there are too 
many stages for 
-  // memory sake.
+  val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
+  // Total of completed and failed stages that have ever been run.  These may 
be greater than
+  // `completedStages.size` and `failedStages.size` if we have run more stages 
or jobs than
+  // JobProgressListener's retention limits.
   var numCompletedStages = 0
   var numFailedStages = 0
 
-  // Map from pool name to a hash map (map from stage id to StageInfo).
-  val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
-
-  val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
+  // Misc:
+  val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
+  def blockManagerIds = executorIdToBlockManagerId.values.toSeq
 
   var schedulingMode: Option[SchedulingMode] = None
 
-  def blockManagerIds = executorIdToBlockManagerId.values.toSeq
+  // To limit the total memory usage of JobProgressListener, we only track 
information for a fixed
+  // number of non-active jobs and stages (there is no limit for active jobs 
and stages):
+
+  val retainedStages = conf.getInt("spark.ui.retainedStages", 
DEFAULT_RETAINED_STAGES)
+  val retainedJobs = conf.getInt("spark.ui.retainedJobs", 
DEFAULT_RETAINED_JOBS)
+
+  // We can test for memory leaks by ensuring that collections that track 
non-active jobs and
+  // stages do not grow without bound and that collections for active 
jobs/stages eventually become
+  // empty once Spark is idle.  Let's partition our collections into ones that 
should be empty
+  // once Spark is idle and ones that should have a hard- or soft-limited 
sizes.
+  // These methods are used by unit tests, but they're defined here so that 
people don't forget to
+  // update the tests when adding new collections.  Some collections have 
multiple levels of
+  // nesting, etc, so this lets us customize our notion of "size" for each 
structure:
+
+  // These collections should all be empty once Spark is idle (no active 
stages / jobs):
+  private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, 
Int] = {
+    Map(
+      "activeStages" -> activeStages.size,
+      "activeJobs" -> activeJobs.size,
+      "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum
+    )
+  }
+
+  // These collections should stop growing once we have run at least 
`spark.ui.retainedStages`
+  // stages and `spark.ui.retainedJobs` jobs:
+  private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = {
+    Map(
+      "completedJobs" -> completedJobs.size,
+      "failedJobs" -> failedJobs.size,
+      "completedStages" -> completedStages.size,
+      "failedStages" -> failedStages.size
+    )
+  }
+  
+  // These collections may grow arbitrarily, but once Spark becomes idle they 
should shrink back to
+  // some bound based on the `spark.ui.retainedStages` and 
`spark.ui.retainedJobs` settings:
+  private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
+    Map(
+      "jobIdToData" -> jobIdToData.size,
+      "stageIdToData" -> stageIdToData.size,
+      "stageIdToStageInfo" -> stageIdToInfo.size
+    )
+  }
+
+  /** If stages is too large, remove and garbage collect old stages */
+  private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = 
synchronized {
+    if (stages.size > retainedStages) {
+      val toRemove = math.max(retainedStages / 10, 1)
+      stages.take(toRemove).foreach { s =>
+        stageIdToData.remove((s.stageId, s.attemptId))
+        stageIdToInfo.remove(s.stageId)
+      }
+      stages.trimStart(toRemove)
+    }
+  }
+
+  /** If jobs is too large, remove and garbage collect old jobs */
+  private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
+    if (jobs.size > retainedJobs) {
+      val toRemove = math.max(retainedJobs / 10, 1)
+      jobs.take(toRemove).foreach { job =>
+        jobIdToData.remove(job.jobId)
+      }
+      jobs.trimStart(toRemove)
+    }
+  }
 
   override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
     val jobGroup = 
Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
@@ -92,9 +159,11 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     jobEnd.jobResult match {
       case JobSucceeded =>
         completedJobs += jobData
+        trimJobsIfNecessary(completedJobs)
         jobData.status = JobExecutionStatus.SUCCEEDED
       case JobFailed(exception) =>
         failedJobs += jobData
+        trimJobsIfNecessary(failedJobs)
         jobData.status = JobExecutionStatus.FAILED
     }
   }
@@ -118,23 +187,11 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     if (stage.failureReason.isEmpty) {
       completedStages += stage
       numCompletedStages += 1
-      trimIfNecessary(completedStages)
+      trimStagesIfNecessary(completedStages)
     } else {
       failedStages += stage
       numFailedStages += 1
-      trimIfNecessary(failedStages)
-    }
-  }
-
-  /** If stages is too large, remove and garbage collect old stages */
-  private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
-    if (stages.size > retainedStages) {
-      val toRemove = math.max(retainedStages / 10, 1)
-      stages.take(toRemove).foreach { s =>
-        stageIdToData.remove((s.stageId, s.attemptId))
-        stageIdToInfo.remove(s.stageId)
-      }
-      stages.trimStart(toRemove)
+      trimStagesIfNecessary(failedStages)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04d462f6/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 7c102cc..15c5b4e 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -28,32 +28,102 @@ import org.apache.spark.util.Utils
 
 class JobProgressListenerSuite extends FunSuite with LocalSparkContext with 
Matchers {
 
-  test("test LRU eviction of stages") {
-    val conf = new SparkConf()
-    conf.set("spark.ui.retainedStages", 5.toString)
-    val listener = new JobProgressListener(conf)
 
-    def createStageStartEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
-      SparkListenerStageSubmitted(stageInfo)
+  private def createStageStartEvent(stageId: Int) = {
+    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+    SparkListenerStageSubmitted(stageInfo)
+  }
+
+  private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
+    val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+    if (failed) {
+      stageInfo.failureReason = Some("Failed!")
     }
+    SparkListenerStageCompleted(stageInfo)
+  }
+
+  private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
+    SparkListenerJobStart(jobId, stageIds)
+  }
 
-    def createStageEndEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
-      SparkListenerStageCompleted(stageInfo)
+  private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
+    val result = if (failed) JobFailed(new Exception("dummy failure")) else 
JobSucceeded
+    SparkListenerJobEnd(jobId, result)
+  }
+
+  private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean 
= false) {
+    val stageIds = jobId * 100 to jobId * 100 + 50
+    listener.onJobStart(createJobStartEvent(jobId, stageIds))
+    for (stageId <- stageIds) {
+      listener.onStageSubmitted(createStageStartEvent(stageId))
+      listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId 
% 2 == 0))
+    }
+    listener.onJobEnd(createJobEndEvent(jobId, shouldFail))
+  }
+
+  private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) {
+    listener.getSizesOfActiveStateTrackingCollections.foreach { case 
(fieldName, size) =>
+      assert(size === 0, s"$fieldName was not empty")
     }
+  }
+
+  test("test LRU eviction of stages") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
 
     for (i <- 1 to 50) {
       listener.onStageSubmitted(createStageStartEvent(i))
       listener.onStageCompleted(createStageEndEvent(i))
     }
+    assertActiveJobsStateIsEmpty(listener)
 
     listener.completedStages.size should be (5)
-    listener.completedStages.count(_.stageId == 50) should be (1)
-    listener.completedStages.count(_.stageId == 49) should be (1)
-    listener.completedStages.count(_.stageId == 48) should be (1)
-    listener.completedStages.count(_.stageId == 47) should be (1)
-    listener.completedStages.count(_.stageId == 46) should be (1)
+    listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 
47, 46))
+  }
+
+  test("test LRU eviction of jobs") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    conf.set("spark.ui.retainedJobs", 5.toString)
+    val listener = new JobProgressListener(conf)
+
+    // Run a bunch of jobs to get the listener into a state where we've 
exceeded both the
+    // job and stage retention limits:
+    for (jobId <- 1 to 10) {
+      runJob(listener, jobId, shouldFail = false)
+    }
+    for (jobId <- 200 to 210) {
+      runJob(listener, jobId, shouldFail = true)
+    }
+    assertActiveJobsStateIsEmpty(listener)
+    // Snapshot the sizes of various soft- and hard-size-limited collections:
+    val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections
+    val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections
+    // Run some more jobs:
+    for (jobId <- 11 to 50) {
+      runJob(listener, jobId, shouldFail = false)
+      // We shouldn't exceed the hard / soft limit sizes after the jobs have 
finished:
+      listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
+      listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
+    }
+
+    listener.completedJobs.size should be (5)
+    listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 
46))
+
+    for (jobId <- 51 to 100) {
+      runJob(listener, jobId, shouldFail = true)
+      // We shouldn't exceed the hard / soft limit sizes after the jobs have 
finished:
+      listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
+      listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
+    }
+    assertActiveJobsStateIsEmpty(listener)
+
+    // Completed and failed jobs each their own size limits, so this should 
still be the same:
+    listener.completedJobs.size should be (5)
+    listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 
46))
+    listener.failedJobs.size should be (5)
+    listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96))
   }
 
   test("test executor id to summary") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to