Repository: spark Updated Branches: refs/heads/branch-1.3 e46096b1e -> 28dd53b1b
[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds Patch should be self-explanatory pwendell JoshRosen Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #4741 from tdas/SPARK-5967 and squashes the following commits: 653b5bb [Tathagata Das] Fixed the fix and added test e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs. (cherry picked from commit 64d2c01ff1048de83b9b8efce987b55e457298f9) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28dd53b1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28dd53b1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28dd53b1 Branch: refs/heads/branch-1.3 Commit: 28dd53b1b613ba010dd4402d0744d6ebdd422fb5 Parents: e46096b Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Feb 24 11:02:47 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Tue Feb 24 11:02:54 2015 -0800 ---------------------------------------------------------------------- .../spark/ui/jobs/JobProgressListener.scala | 3 +++ .../ui/jobs/JobProgressListenerSuite.scala | 22 ++++++++++++++++++++ 2 files changed, 25 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/28dd53b1/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 0b6fe70..937d95a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { for (stageId <- jobData.stageIds) { stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage => jobsUsingStage.remove(jobEnd.jobId) + if (jobsUsingStage.isEmpty) { + stageIdToActiveJobIds.remove(stageId) + } stageIdToInfo.get(stageId).foreach { stageInfo => if (stageInfo.submissionTime.isEmpty) { // if this stage is pending, it won't complete, so mark it as "skipped": http://git-wip-us.apache.org/repos/asf/spark/blob/28dd53b1/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 6019282..730a4b5 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46)) } + test("test clearing of stageIdToActiveJobs") { + val conf = new SparkConf() + conf.set("spark.ui.retainedStages", 5.toString) + val listener = new JobProgressListener(conf) + val jobId = 0 + val stageIds = 1 to 50 + // Start a job with 50 stages + listener.onJobStart(createJobStartEvent(jobId, stageIds)) + for (stageId <- stageIds) { + listener.onStageSubmitted(createStageStartEvent(stageId)) + } + listener.stageIdToActiveJobIds.size should be > 0 + + // Complete the stages and job + for (stageId <- stageIds) { + listener.onStageCompleted(createStageEndEvent(stageId, failed = false)) + } + listener.onJobEnd(createJobEndEvent(jobId, false)) + assertActiveJobsStateIsEmpty(listener) + listener.stageIdToActiveJobIds.size should be (0) + } + test("test LRU eviction of jobs") { val conf = new SparkConf() conf.set("spark.ui.retainedStages", 5.toString) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org