Repository: spark Updated Branches: refs/heads/branch-1.4 59399a8f0 -> 306837e4e
[SPARK-8001] [CORE] Make AsynchronousListenerBus.waitUntilEmpty throw TimeoutException if timeout Some places forget to call `assert` to check the return value of `AsynchronousListenerBus.waitUntilEmpty`. Instead of adding `assert` in these places, I think it's better to make `AsynchronousListenerBus.waitUntilEmpty` throw `TimeoutException`. Author: zsxwing <zsxw...@gmail.com> Closes #6550 from zsxwing/SPARK-8001 and squashes the following commits: 607674a [zsxwing] Make AsynchronousListenerBus.waitUntilEmpty throw TimeoutException if timeout (cherry picked from commit 1d8669f15c136cd81f494dd487400c62c9498602) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306837e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306837e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306837e4 Branch: refs/heads/branch-1.4 Commit: 306837e4e31e95ef9b66356ea13f17c2e2b2e7bd Parents: 59399a8 Author: zsxwing <zsxw...@gmail.com> Authored: Wed Jun 3 15:03:07 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Wed Jun 3 15:03:15 2015 -0700 ---------------------------------------------------------------------- .../spark/util/AsynchronousListenerBus.scala | 11 +++++----- .../spark/deploy/LogUrlsStandaloneSuite.scala | 4 ++-- .../spark/scheduler/DAGSchedulerSuite.scala | 18 ++++++++-------- .../spark/scheduler/SparkListenerSuite.scala | 22 ++++++++++---------- .../SparkListenerWithClusterSuite.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 2 +- 6 files changed, 30 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/306837e4/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 1861d38..61b5a4c 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -120,21 +120,22 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri /** * For testing only. Wait until there are no more events in the queue, or until the specified - * time has elapsed. Return true if the queue has emptied and false is the specified time - * elapsed before the queue emptied. + * time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue + * emptied. */ @VisibleForTesting - def waitUntilEmpty(timeoutMillis: Int): Boolean = { + @throws(classOf[TimeoutException]) + def waitUntilEmpty(timeoutMillis: Long): Unit = { val finishTime = System.currentTimeMillis + timeoutMillis while (!queueIsEmpty) { if (System.currentTimeMillis > finishTime) { - return false + throw new TimeoutException( + s"The event queue is not empty after $timeoutMillis milliseconds") } /* Sleep rather than using wait/notify, because this is used only for testing and * wait/notify add overhead in the general case. */ Thread.sleep(10) } - true } /** http://git-wip-us.apache.org/repos/asf/spark/blob/306837e4/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index c93d16f..82f506c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -43,7 +43,7 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { // Trigger a job so that executors get added sc.parallelize(1 to 100, 4).map(_.toString).count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) // Browse to each URL to check that it's valid @@ -73,7 +73,7 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { // Trigger a job so that executors get added sc.parallelize(1 to 100, 4).map(_.toString).count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] assert(listeners.size === 1) val listener = listeners(0) http://git-wip-us.apache.org/repos/asf/spark/blob/306837e4/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 89b6bc9..865445d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -254,7 +254,7 @@ class DAGSchedulerSuite test("[SPARK-3353] parent stage should have lower stage id") { sparkListener.stageByOrderOfExecution.clear() sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.stageByOrderOfExecution.length === 2) assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) } @@ -362,7 +362,7 @@ class DAGSchedulerSuite submit(unserializableRdd, Array(0)) assert(failure.getMessage.startsWith( "Job aborted due to stage failure: Task not serializable:")) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(0)) assert(sparkListener.failedStages.size === 1) assertDataStructuresEmpty() @@ -372,7 +372,7 @@ class DAGSchedulerSuite submit(new MyRDD(sc, 1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job aborted due to stage failure: some failure") - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(0)) assert(sparkListener.failedStages.size === 1) assertDataStructuresEmpty() @@ -383,7 +383,7 @@ class DAGSchedulerSuite val jobId = submit(rdd, Array(0)) cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(0)) assert(sparkListener.failedStages.size === 1) assertDataStructuresEmpty() @@ -435,7 +435,7 @@ class DAGSchedulerSuite assert(results === Map(0 -> 42)) assertDataStructuresEmpty() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.isEmpty) assert(sparkListener.successfulStages.contains(0)) } @@ -504,7 +504,7 @@ class DAGSchedulerSuite Map[Long, Any](), createFakeTaskInfo(), null)) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(1)) // The second ResultTask fails, with a fetch failure for the output from the second mapper. @@ -516,7 +516,7 @@ class DAGSchedulerSuite createFakeTaskInfo(), null)) // The SparkListener should not receive redundant failure events. - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.size == 1) } @@ -565,7 +565,7 @@ class DAGSchedulerSuite // Listener bus should get told about the map stage failing, but not the reduce stage // (since the reduce stage hasn't been started yet). - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.toSet === Set(0)) assertDataStructuresEmpty() @@ -616,7 +616,7 @@ class DAGSchedulerSuite assert(cancelledStages.toSet === Set(0, 2)) // Make sure the listeners got told about both failed stages. - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.successfulStages.isEmpty) assert(sparkListener.failedStages.toSet === Set(0, 2)) http://git-wip-us.apache.org/repos/asf/spark/blob/306837e4/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 825c616..1c84a88 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -47,7 +47,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers // Starting listener bus should flush all buffered events bus.start(sc) - assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) // After listener bus has stopped, posting events should not increment counter @@ -131,7 +131,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers rdd2.setName("Target RDD") rdd2.count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be {1} val (stageInfo, taskInfoMetrics) = listener.stageInfos.head @@ -156,7 +156,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers rdd3.setName("Trois") rdd1.count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be {1} val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD @@ -165,7 +165,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers listener.stageInfos.clear() rdd2.count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be {1} val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get stageInfo2.rddInfos.size should be {3} // ParallelCollectionRDD, FilteredRDD, MappedRDD @@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers listener.stageInfos.clear() rdd3.count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be {2} // Shuffle map stage + result stage val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get stageInfo3.rddInfos.size should be {1} // ShuffledRDD @@ -190,7 +190,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers val rdd2 = rdd1.map(_.toString) sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be {1} val (stageInfo, _) = listener.stageInfos.head @@ -214,7 +214,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers val d = sc.parallelize(0 to 1e4.toInt, 64).map(w) d.count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be (1) val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1") @@ -225,7 +225,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers d4.setName("A Cogroup") d4.collectAsMap() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) listener.stageInfos.size should be (4) listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) => /** @@ -281,7 +281,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers .reduce { case (x, y) => x } assert(result === 1.to(akkaFrameSize).toArray) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val TASK_INDEX = 0 assert(listener.startedTasks.contains(TASK_INDEX)) assert(listener.startedGettingResultTasks.contains(TASK_INDEX)) @@ -297,7 +297,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x } assert(result === 2) - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val TASK_INDEX = 0 assert(listener.startedTasks.contains(TASK_INDEX)) assert(listener.startedGettingResultTasks.isEmpty) @@ -352,7 +352,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } - assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // The exception should be caught, and the event should be propagated to other listeners assert(bus.listenerThreadIsAlive) http://git-wip-us.apache.org/repos/asf/spark/blob/306837e4/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 623a687..10c43b8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -46,7 +46,7 @@ class SparkListenerWithClusterSuite extends FunSuite with LocalSparkContext rdd2.setName("Target RDD") rdd2.count() - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(listener.addedExecutorInfo.size == 2) assert(listener.addedExecutorInfo("0").totalCores == 1) assert(listener.addedExecutorInfo("1").totalCores == 1) http://git-wip-us.apache.org/repos/asf/spark/blob/306837e4/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d3c606e..8943616 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -318,7 +318,7 @@ private object YarnClusterDriver extends Logging with Matchers { var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet - assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) data should be (Set(1, 2, 3, 4)) result = "success" } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org