spark git commit: [SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests
Repository: spark Updated Branches: refs/heads/branch-1.6 24d59fb64 -> 60e095b9b [SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests ## What changes were proposed in this pull request? Make spill tests wait until job has completed before returning the number of stages that spilled ## How was this patch tested? Existing Jenkins tests. Author: Sean OwenCloses #13896 from srowen/SPARK-16193. (cherry picked from commit e87741589a24821b5fe73e5d9ee2164247998580) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60e095b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60e095b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60e095b9 Branch: refs/heads/branch-1.6 Commit: 60e095b9bea3caa3e9d1e768d116f911a048d8ec Parents: 24d59fb Author: Sean Owen Authored: Sat Jun 25 12:14:14 2016 +0100 Committer: Sean Owen Committed: Sat Jun 25 12:14:40 2016 +0100 -- core/src/main/scala/org/apache/spark/TestUtils.scala | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60e095b9/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 43c89b2..871b9d1 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -22,6 +22,7 @@ import java.net.{URI, URL} import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Arrays +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConverters._ @@ -190,8 +191,14 @@ private[spark] object TestUtils { private class SpillListener extends SparkListener { private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] private val spilledStageIds = new mutable.HashSet[Int] + private val stagesDone = new CountDownLatch(1) - def numSpilledStages: Int = spilledStageIds.size + def numSpilledStages: Int = { +// Long timeout, just in case somehow the job end isn't notified. +// Fails if a timeout occurs +assert(stagesDone.await(10, TimeUnit.SECONDS)) +spilledStageIds.size + } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { stageIdToTaskMetrics.getOrElseUpdate( @@ -206,4 +213,8 @@ private class SpillListener extends SparkListener { spilledStageIds += stageId } } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +stagesDone.countDown() + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests
Repository: spark Updated Branches: refs/heads/master 3ee9695d1 -> e87741589 [SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests ## What changes were proposed in this pull request? Make spill tests wait until job has completed before returning the number of stages that spilled ## How was this patch tested? Existing Jenkins tests. Author: Sean OwenCloses #13896 from srowen/SPARK-16193. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8774158 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8774158 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8774158 Branch: refs/heads/master Commit: e87741589a24821b5fe73e5d9ee2164247998580 Parents: 3ee9695 Author: Sean Owen Authored: Sat Jun 25 12:14:14 2016 +0100 Committer: Sean Owen Committed: Sat Jun 25 12:14:14 2016 +0100 -- core/src/main/scala/org/apache/spark/TestUtils.scala | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8774158/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 43c89b2..871b9d1 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -22,6 +22,7 @@ import java.net.{URI, URL} import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Arrays +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConverters._ @@ -190,8 +191,14 @@ private[spark] object TestUtils { private class SpillListener extends SparkListener { private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] private val spilledStageIds = new mutable.HashSet[Int] + private val stagesDone = new CountDownLatch(1) - def numSpilledStages: Int = spilledStageIds.size + def numSpilledStages: Int = { +// Long timeout, just in case somehow the job end isn't notified. +// Fails if a timeout occurs +assert(stagesDone.await(10, TimeUnit.SECONDS)) +spilledStageIds.size + } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { stageIdToTaskMetrics.getOrElseUpdate( @@ -206,4 +213,8 @@ private class SpillListener extends SparkListener { spilledStageIds += stageId } } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +stagesDone.countDown() + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests
Repository: spark Updated Branches: refs/heads/branch-2.0 cbfcdcfb6 -> b03b0976f [SPARK-16193][TESTS] Address flaky ExternalAppendOnlyMapSuite spilling tests ## What changes were proposed in this pull request? Make spill tests wait until job has completed before returning the number of stages that spilled ## How was this patch tested? Existing Jenkins tests. Author: Sean OwenCloses #13896 from srowen/SPARK-16193. (cherry picked from commit e87741589a24821b5fe73e5d9ee2164247998580) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b03b0976 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b03b0976 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b03b0976 Branch: refs/heads/branch-2.0 Commit: b03b0976fac878bf7e5d1721441179a4d4d9c317 Parents: cbfcdcf Author: Sean Owen Authored: Sat Jun 25 12:14:14 2016 +0100 Committer: Sean Owen Committed: Sat Jun 25 12:14:24 2016 +0100 -- core/src/main/scala/org/apache/spark/TestUtils.scala | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b03b0976/core/src/main/scala/org/apache/spark/TestUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 43c89b2..871b9d1 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -22,6 +22,7 @@ import java.net.{URI, URL} import java.nio.charset.StandardCharsets import java.nio.file.Paths import java.util.Arrays +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.jar.{JarEntry, JarOutputStream} import scala.collection.JavaConverters._ @@ -190,8 +191,14 @@ private[spark] object TestUtils { private class SpillListener extends SparkListener { private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]] private val spilledStageIds = new mutable.HashSet[Int] + private val stagesDone = new CountDownLatch(1) - def numSpilledStages: Int = spilledStageIds.size + def numSpilledStages: Int = { +// Long timeout, just in case somehow the job end isn't notified. +// Fails if a timeout occurs +assert(stagesDone.await(10, TimeUnit.SECONDS)) +spilledStageIds.size + } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { stageIdToTaskMetrics.getOrElseUpdate( @@ -206,4 +213,8 @@ private class SpillListener extends SparkListener { spilledStageIds += stageId } } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { +stagesDone.countDown() + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org