Repository: spark
Updated Branches:
  refs/heads/master 3ee9695d1 -> e87741589


[SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests

## What changes were proposed in this pull request?

Make spill tests wait until job has completed before returning the number of 
stages that spilled

## How was this patch tested?

Existing Jenkins tests.

Author: Sean Owen <so...@cloudera.com>

Closes #13896 from srowen/SPARK-16193.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8774158
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8774158
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8774158

Branch: refs/heads/master
Commit: e87741589a24821b5fe73e5d9ee2164247998580
Parents: 3ee9695
Author: Sean Owen <so...@cloudera.com>
Authored: Sat Jun 25 12:14:14 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Jun 25 12:14:14 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/TestUtils.scala | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e8774158/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 43c89b2..871b9d1 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -22,6 +22,7 @@ import java.net.{URI, URL}
 import java.nio.charset.StandardCharsets
 import java.nio.file.Paths
 import java.util.Arrays
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream}
 
 import scala.collection.JavaConverters._
@@ -190,8 +191,14 @@ private[spark] object TestUtils {
 private class SpillListener extends SparkListener {
   private val stageIdToTaskMetrics = new mutable.HashMap[Int, 
ArrayBuffer[TaskMetrics]]
   private val spilledStageIds = new mutable.HashSet[Int]
+  private val stagesDone = new CountDownLatch(1)
 
-  def numSpilledStages: Int = spilledStageIds.size
+  def numSpilledStages: Int = {
+    // Long timeout, just in case somehow the job end isn't notified.
+    // Fails if a timeout occurs
+    assert(stagesDone.await(10, TimeUnit.SECONDS))
+    spilledStageIds.size
+  }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
     stageIdToTaskMetrics.getOrElseUpdate(
@@ -206,4 +213,8 @@ private class SpillListener extends SparkListener {
       spilledStageIds += stageId
     }
   }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+    stagesDone.countDown()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to