Repository: spark
Updated Branches:
  refs/heads/master 33bae585d -> 6518ef630


[SPARK-9948] Fix flaky AccumulatorSuite - internal accumulators

In these tests, we use a custom listener and we assert on fields in the stage / 
task completion events. However, these events are posted in a separate thread 
so they're not guaranteed to be posted in time. This commit fixes this 
flakiness through a job end registration callback.

Author: Andrew Or <and...@databricks.com>

Closes #8176 from andrewor14/fix-accumulator-suite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6518ef63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6518ef63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6518ef63

Branch: refs/heads/master
Commit: 6518ef63037aa56b541927f99ad26744f91098ce
Parents: 33bae58
Author: Andrew Or <and...@databricks.com>
Authored: Fri Aug 14 13:42:53 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri Aug 14 13:42:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/AccumulatorSuite.scala     | 153 +++++++++++--------
 1 file changed, 92 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6518ef63/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 0eb2293..5b84acf 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -182,26 +182,30 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
     sc = new SparkContext("local", "test")
     sc.addSparkListener(listener)
     // Have each task add 1 to the internal accumulator
-    sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
+    val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
       TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
       iter
-    }.count()
-    val stageInfos = listener.getCompletedStageInfos
-    val taskInfos = listener.getCompletedTaskInfos
-    assert(stageInfos.size === 1)
-    assert(taskInfos.size === numPartitions)
-    // The accumulator values should be merged in the stage
-    val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, 
TEST_ACCUMULATOR)
-    assert(stageAccum.value.toLong === numPartitions)
-    // The accumulator should be updated locally on each task
-    val taskAccumValues = taskInfos.map { taskInfo =>
-      val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
-      assert(taskAccum.update.isDefined)
-      assert(taskAccum.update.get.toLong === 1)
-      taskAccum.value.toLong
     }
-    // Each task should keep track of the partial value on the way, i.e. 1, 2, 
... numPartitions
-    assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+    // Register asserts in job completion callback to avoid flakiness
+    listener.registerJobCompletionCallback { _ =>
+      val stageInfos = listener.getCompletedStageInfos
+      val taskInfos = listener.getCompletedTaskInfos
+      assert(stageInfos.size === 1)
+      assert(taskInfos.size === numPartitions)
+      // The accumulator values should be merged in the stage
+      val stageAccum = 
findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
+      assert(stageAccum.value.toLong === numPartitions)
+      // The accumulator should be updated locally on each task
+      val taskAccumValues = taskInfos.map { taskInfo =>
+        val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
+        assert(taskAccum.update.isDefined)
+        assert(taskAccum.update.get.toLong === 1)
+        taskAccum.value.toLong
+      }
+      // Each task should keep track of the partial value on the way, i.e. 1, 
2, ... numPartitions
+      assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+    }
+    rdd.count()
   }
 
   test("internal accumulators in multiple stages") {
@@ -211,7 +215,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     sc.addSparkListener(listener)
     // Each stage creates its own set of internal accumulators so the
     // values for the same metric should not be mixed up across stages
-    sc.parallelize(1 to 100, numPartitions)
+    val rdd = sc.parallelize(1 to 100, numPartitions)
       .map { i => (i, i) }
       .mapPartitions { iter =>
         TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
@@ -227,16 +231,20 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
         TaskContext.get().internalMetricsToAccumulators(TEST_ACCUMULATOR) += 
100
         iter
       }
-      .count()
-    // We ran 3 stages, and the accumulator values should be distinct
-    val stageInfos = listener.getCompletedStageInfos
-    assert(stageInfos.size === 3)
-    val firstStageAccum = 
findAccumulableInfo(stageInfos(0).accumulables.values, TEST_ACCUMULATOR)
-    val secondStageAccum = 
findAccumulableInfo(stageInfos(1).accumulables.values, TEST_ACCUMULATOR)
-    val thirdStageAccum = 
findAccumulableInfo(stageInfos(2).accumulables.values, TEST_ACCUMULATOR)
-    assert(firstStageAccum.value.toLong === numPartitions)
-    assert(secondStageAccum.value.toLong === numPartitions * 10)
-    assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
+    // Register asserts in job completion callback to avoid flakiness
+    listener.registerJobCompletionCallback { _ =>
+      // We ran 3 stages, and the accumulator values should be distinct
+      val stageInfos = listener.getCompletedStageInfos
+      assert(stageInfos.size === 3)
+      val (firstStageAccum, secondStageAccum, thirdStageAccum) =
+        (findAccumulableInfo(stageInfos(0).accumulables.values, 
TEST_ACCUMULATOR),
+        findAccumulableInfo(stageInfos(1).accumulables.values, 
TEST_ACCUMULATOR),
+        findAccumulableInfo(stageInfos(2).accumulables.values, 
TEST_ACCUMULATOR))
+      assert(firstStageAccum.value.toLong === numPartitions)
+      assert(secondStageAccum.value.toLong === numPartitions * 10)
+      assert(thirdStageAccum.value.toLong === numPartitions * 2 * 100)
+    }
+    rdd.count()
   }
 
   test("internal accumulators in fully resubmitted stages") {
@@ -268,7 +276,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     // This says use 1 core and retry tasks up to 2 times
     sc = new SparkContext("local[1, 2]", "test")
     sc.addSparkListener(listener)
-    sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { case (i, 
iter) =>
+    val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitionsWithIndex { 
case (i, iter) =>
       val taskContext = TaskContext.get()
       taskContext.internalMetricsToAccumulators(TEST_ACCUMULATOR) += 1
       // Fail the first attempts of a subset of the tasks
@@ -276,28 +284,32 @@ class AccumulatorSuite extends SparkFunSuite with 
Matchers with LocalSparkContex
         throw new Exception("Failing a task intentionally.")
       }
       iter
-    }.count()
-    val stageInfos = listener.getCompletedStageInfos
-    val taskInfos = listener.getCompletedTaskInfos
-    assert(stageInfos.size === 1)
-    assert(taskInfos.size === numPartitions + numFailedPartitions)
-    val stageAccum = findAccumulableInfo(stageInfos.head.accumulables.values, 
TEST_ACCUMULATOR)
-    // We should not double count values in the merged accumulator
-    assert(stageAccum.value.toLong === numPartitions)
-    val taskAccumValues = taskInfos.flatMap { taskInfo =>
-      if (!taskInfo.failed) {
-        // If a task succeeded, its update value should always be 1
-        val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
-        assert(taskAccum.update.isDefined)
-        assert(taskAccum.update.get.toLong === 1)
-        Some(taskAccum.value.toLong)
-      } else {
-        // If a task failed, we should not get its accumulator values
-        assert(taskInfo.accumulables.isEmpty)
-        None
+    }
+    // Register asserts in job completion callback to avoid flakiness
+    listener.registerJobCompletionCallback { _ =>
+      val stageInfos = listener.getCompletedStageInfos
+      val taskInfos = listener.getCompletedTaskInfos
+      assert(stageInfos.size === 1)
+      assert(taskInfos.size === numPartitions + numFailedPartitions)
+      val stageAccum = 
findAccumulableInfo(stageInfos.head.accumulables.values, TEST_ACCUMULATOR)
+      // We should not double count values in the merged accumulator
+      assert(stageAccum.value.toLong === numPartitions)
+      val taskAccumValues = taskInfos.flatMap { taskInfo =>
+        if (!taskInfo.failed) {
+          // If a task succeeded, its update value should always be 1
+          val taskAccum = findAccumulableInfo(taskInfo.accumulables, 
TEST_ACCUMULATOR)
+          assert(taskAccum.update.isDefined)
+          assert(taskAccum.update.get.toLong === 1)
+          Some(taskAccum.value.toLong)
+        } else {
+          // If a task failed, we should not get its accumulator values
+          assert(taskInfo.accumulables.isEmpty)
+          None
+        }
       }
+      assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
     }
-    assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+    rdd.count()
   }
 
 }
@@ -313,20 +325,27 @@ private[spark] object AccumulatorSuite {
       testName: String)(testBody: => Unit): Unit = {
     val listener = new SaveInfoListener
     sc.addSparkListener(listener)
-    // Verify that the accumulator does not already exist
+    // Register asserts in job completion callback to avoid flakiness
+    listener.registerJobCompletionCallback { jobId =>
+      if (jobId == 0) {
+        // The first job is a dummy one to verify that the accumulator does 
not already exist
+        val accums = 
listener.getCompletedStageInfos.flatMap(_.accumulables.values)
+        assert(!accums.exists(_.name == 
InternalAccumulator.PEAK_EXECUTION_MEMORY))
+      } else {
+        // In the subsequent jobs, verify that peak execution memory is updated
+        val accum = listener.getCompletedStageInfos
+          .flatMap(_.accumulables.values)
+          .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
+          .getOrElse {
+          throw new TestFailedException(
+            s"peak execution memory accumulator not set in '$testName'", 0)
+        }
+        assert(accum.value.toLong > 0)
+      }
+    }
+    // Run the jobs
     sc.parallelize(1 to 10).count()
-    val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
-    assert(!accums.exists(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY))
     testBody
-    // Verify that peak execution memory is updated
-    val accum = listener.getCompletedStageInfos
-      .flatMap(_.accumulables.values)
-      .find(_.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
-      .getOrElse {
-        throw new TestFailedException(
-          s"peak execution memory accumulator not set in '$testName'", 0)
-      }
-    assert(accum.value.toLong > 0)
   }
 }
 
@@ -336,10 +355,22 @@ private[spark] object AccumulatorSuite {
 private class SaveInfoListener extends SparkListener {
   private val completedStageInfos: ArrayBuffer[StageInfo] = new 
ArrayBuffer[StageInfo]
   private val completedTaskInfos: ArrayBuffer[TaskInfo] = new 
ArrayBuffer[TaskInfo]
+  private var jobCompletionCallback: (Int => Unit) = null // parameter is job 
ID
 
   def getCompletedStageInfos: Seq[StageInfo] = 
completedStageInfos.toArray.toSeq
   def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.toArray.toSeq
 
+  /** Register a callback to be called on job end. */
+  def registerJobCompletionCallback(callback: (Int => Unit)): Unit = {
+    jobCompletionCallback = callback
+  }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+    if (jobCompletionCallback != null) {
+      jobCompletionCallback(jobEnd.jobId)
+    }
+  }
+
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
Unit = {
     completedStageInfos += stageCompleted.stageInfo
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to