Repository: spark
Updated Branches:
  refs/heads/branch-2.3 23ba4416e -> a0d794989


[SPARK-23475][WEBUI] Skipped stages should be evicted before completed stages

## What changes were proposed in this pull request?

The root cause of missing completed stages is because `cleanupStages` will 
never remove skipped stages.

This PR changes the logic to always remove skipped stage first. This is safe 
since  the job itself contains enough information to render skipped stages in 
the UI.

## How was this patch tested?

The new unit tests.

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #20656 from zsxwing/SPARK-23475.

(cherry picked from commit 45cf714ee6d4eead2fe00794a0d754fa6d33d4a6)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0d79498
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0d79498
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0d79498

Branch: refs/heads/branch-2.3
Commit: a0d7949896e70f427e7f3942ff340c9484ff0aab
Parents: 23ba441
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Wed Feb 21 19:43:11 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Wed Feb 21 19:43:22 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala |  5 ++-
 .../spark/status/AppStatusListenerSuite.scala   | 36 ++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a0d79498/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 8ac0291..496165c 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -863,7 +863,10 @@ private[spark] class AppStatusListener(
       return
     }
 
-    val view = 
kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
+    // As the completion time of a skipped stage is always -1, we will remove 
skipped stages first.
+    // This is safe since the job itself contains enough information to render 
skipped stages in the
+    // UI.
+    val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime")
     val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
       s.info.status != v1.StageStatus.ACTIVE && s.info.status != 
v1.StageStatus.PENDING
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a0d79498/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index f3fa4c9..eb03ef3 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1025,6 +1025,42 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
   }
 
+  test("skipped stages should be evicted before completed stages") {
+    val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
+    val listener = new AppStatusListener(store, testConf, true)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+
+    // Sart job 1
+    time += 1
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), 
null))
+
+    // Start and stop stage 1
+    time += 1
+    stage1.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
+
+    time += 1
+    stage1.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+    // Stop job 1 and stage 2 will become SKIPPED
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+    // Submit stage 3 and verify stage 2 is evicted
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+    time += 1
+    stage3.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new 
Properties()))
+
+    assert(store.count(classOf[StageDataWrapper]) === 2)
+    intercept[NoSuchElementException] {
+      store.read(classOf[StageDataWrapper], Array(2, 0))
+    }
+  }
+
   test("eviction should respect task completion time") {
     val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
     val listener = new AppStatusListener(store, testConf, true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to