This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new efe7fd2  [SPARK-31730][CORE][TEST] Fix flaky tests in 
BarrierTaskContextSuite
efe7fd2 is described below

commit efe7fd2b6bea4a945ed7f3f486ab279c505378b4
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
AuthorDate: Wed May 27 16:37:02 2020 -0700

    [SPARK-31730][CORE][TEST] Fix flaky tests in BarrierTaskContextSuite
    
    ### What changes were proposed in this pull request?
    
    To wait until all the executors have started before submitting any job. 
This could avoid the flakiness caused by waiting for executors coming up.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #28584 from jiangxb1987/barrierTest.
    
    Authored-by: Xingbo Jiang <xingbo.ji...@databricks.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 26 +++++++++-------------
 1 file changed, 10 insertions(+), 16 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 6191e41..54899bf 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.concurrent.Eventually
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 
 class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext 
with Eventually {
@@ -37,10 +38,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
       .setAppName("test-cluster")
       .set(TEST_NO_STAGE_RETRY, true)
     sc = new SparkContext(conf)
+    TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
   }
 
-  // TODO (SPARK-31730): re-enable it
-  ignore("global sync by barrier() call") {
+  test("global sync by barrier() call") {
     initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
@@ -57,10 +58,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
   }
 
   test("share messages with allGather() call") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[4, 1, 1024]")
-      .setAppName("test-cluster")
-    sc = new SparkContext(conf)
+    initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -78,10 +76,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
   }
 
   test("throw exception if we attempt to synchronize with different blocking 
calls") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[4, 1, 1024]")
-      .setAppName("test-cluster")
-    sc = new SparkContext(conf)
+    initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -100,10 +95,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
   }
 
   test("successively sync with allGather and barrier") {
-    val conf = new SparkConf()
-      .setMaster("local-cluster[4, 1, 1024]")
-      .setAppName("test-cluster")
-    sc = new SparkContext(conf)
+    initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()
@@ -129,8 +121,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
     assert(times2.max - times2.min <= 1000)
   }
 
-  // TODO (SPARK-31730): re-enable it
-  ignore("support multiple barrier() call within a single task") {
+  test("support multiple barrier() call within a single task") {
     initLocalClusterSparkContext()
     val rdd = sc.makeRDD(1 to 10, 4)
     val rdd2 = rdd.barrier().mapPartitions { it =>
@@ -285,6 +276,9 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
 
   test("SPARK-31485: barrier stage should fail if only partial tasks are 
launched") {
     initLocalClusterSparkContext(2)
+    // It's required to reset the delay timer when a task is scheduled, 
otherwise all the tasks
+    // could get scheduled at ANY level.
+    sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
     val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
     val dep = new OneToOneDependency[Int](rdd0)
     // set up a barrier stage with 2 tasks and both tasks prefer executor 0 
(only 1 core) for


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to