This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d5ef2f  [SPARK-31052][TEST][CORE] Fix flaky test 
"DAGSchedulerSuite.shuffle fetch failed on speculative task, but original task 
succeed"
8d5ef2f is described below

commit 8d5ef2f766166cce3cc7a15a98ec016050ede4d8
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Mar 5 10:56:49 2020 -0800

    [SPARK-31052][TEST][CORE] Fix flaky test "DAGSchedulerSuite.shuffle fetch 
failed on speculative task, but original task succeed"
    
    ### What changes were proposed in this pull request?
    
    This PR fix the flaky test in #27050.
    
    ### Why are the changes needed?
    
    `SparkListenerStageCompleted` is posted by `listenerBus` asynchronously. 
So, we should make sure listener has consumed the event before asserting 
completed stages.
    
    See [error 
message](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/119308/testReport/org.apache.spark.scheduler/DAGSchedulerSuite/shuffle_fetch_failed_on_speculative_task__but_original_task_succeed__SPARK_30388_/):
    
    ```
    sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
List(0, 1, 1) did not equal List(0, 1, 1, 0)
        at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
        at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
        at 
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
        at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503)
        at 
org.apache.spark.scheduler.DAGSchedulerSuite.$anonfun$new$88(DAGSchedulerSuite.scala:1976)
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Update test and test locally by no failure after running hundreds of times. 
Note, the failure is easy to reproduce when loop running the test for hundreds 
of times(e.g 200)
    
    Closes #27809 from Ngone51/fix_flaky_spark_30388.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2b2fd32..4486389 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1933,7 +1933,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
-  test("shuffle fetch failed on speculative task, but original task succeed 
(SPARK-30388)") {
+  test("SPARK-30388: shuffle fetch failed on speculative task, but original 
task succeed") {
     var completedStage: List[Int] = Nil
     val listener = new SparkListener() {
       override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
@@ -1947,6 +1947,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     completeShuffleMapStageSuccessfully(0, 0, 2)
+    sc.listenerBus.waitUntilEmpty()
     assert(completedStage === List(0))
 
     // result task 0.0 succeed
@@ -1962,6 +1963,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
         info
       )
     )
+    sc.listenerBus.waitUntilEmpty()
     assert(completedStage === List(0, 1))
 
     Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
@@ -1973,6 +1975,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 
     // original result task 1.0 succeed
     runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42))
+    sc.listenerBus.waitUntilEmpty()
     assert(completedStage === List(0, 1, 1, 0))
     assert(scheduler.activeJobs.isEmpty)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to