Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e46096b1e -> 28dd53b1b


[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds

Patch should be self-explanatory
pwendell JoshRosen

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #4741 from tdas/SPARK-5967 and squashes the following commits:

653b5bb [Tathagata Das] Fixed the fix and added test
e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs.

(cherry picked from commit 64d2c01ff1048de83b9b8efce987b55e457298f9)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28dd53b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28dd53b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28dd53b1

Branch: refs/heads/branch-1.3
Commit: 28dd53b1b613ba010dd4402d0744d6ebdd422fb5
Parents: e46096b
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Feb 24 11:02:47 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Feb 24 11:02:54 2015 -0800

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  3 +++
 .../ui/jobs/JobProgressListenerSuite.scala      | 22 ++++++++++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28dd53b1/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 0b6fe70..937d95a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     for (stageId <- jobData.stageIds) {
       stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
         jobsUsingStage.remove(jobEnd.jobId)
+        if (jobsUsingStage.isEmpty) {
+          stageIdToActiveJobIds.remove(stageId)
+        }
         stageIdToInfo.get(stageId).foreach { stageInfo =>
           if (stageInfo.submissionTime.isEmpty) {
             // if this stage is pending, it won't complete, so mark it as 
"skipped":

http://git-wip-us.apache.org/repos/asf/spark/blob/28dd53b1/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6019282..730a4b5 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 
47, 46))
   }
 
+  test("test clearing of stageIdToActiveJobs") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
+    val jobId = 0
+    val stageIds = 1 to 50
+    // Start a job with 50 stages
+    listener.onJobStart(createJobStartEvent(jobId, stageIds))
+    for (stageId <- stageIds) {
+      listener.onStageSubmitted(createStageStartEvent(stageId))
+    }
+    listener.stageIdToActiveJobIds.size should be > 0
+
+    // Complete the stages and job
+    for (stageId <- stageIds) {
+      listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
+    }
+    listener.onJobEnd(createJobEndEvent(jobId, false))
+    assertActiveJobsStateIsEmpty(listener)
+    listener.stageIdToActiveJobIds.size should be (0)
+  }
+
   test("test LRU eviction of jobs") {
     val conf = new SparkConf()
     conf.set("spark.ui.retainedStages", 5.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to