This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 65530501bb2 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case
65530501bb2 is described below

commit 65530501bb23a1e94f5138d9be0ef962bf33de76
Author: Mridul <mridulatgmail.com>
AuthorDate: Fri Nov 11 19:58:13 2022 +0900

    [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case
    
    ### What changes were proposed in this pull request?
    Fix flakey test failure
    
    ### Why are the changes needed?
    MT-safety issue in test
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Local tests.
    Need to validate on CI
    
    Closes #38617 from mridulm/fix-test-failure.
    
    Authored-by: Mridul <mridulatgmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/scheduler/DAGSchedulerSuite.scala    | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f4e67eba40d..17abf3aef4e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.scheduler
 
-import java.util.Properties
+import java.util.{ArrayList => JArrayList, Collections => JCollections, 
Properties}
 import java.util.concurrent.{CountDownLatch, Delayed, ScheduledFuture, 
TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
 
 import scala.annotation.meta.param
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
@@ -4533,16 +4534,20 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       blockStoreClientField.setAccessible(true)
       blockStoreClientField.set(sc.env.blockManager, blockStoreClient)
 
-      val sentHosts = ArrayBuffer[String]()
+      val sentHosts = JCollections.synchronizedList(new JArrayList[String]())
       var hostAInterrupted = false
       doAnswer { (invoke: InvocationOnMock) =>
         val host = invoke.getArgument[String](0)
-        sendRequestsLatch.countDown()
         try {
           if (host == "hostA") {
-            canSendRequestLatch.await(timeoutSecs * 2, TimeUnit.SECONDS)
+            sendRequestsLatch.countDown()
+            canSendRequestLatch.await(timeoutSecs * 5, TimeUnit.SECONDS)
+            // should not reach here, will get interrupted by DAGScheduler
+            sentHosts.add(host)
+          } else {
+            sentHosts.add(host)
+            sendRequestsLatch.countDown()
           }
-          sentHosts += host
         } catch {
           case _: InterruptedException => hostAInterrupted = true
         } finally {
@@ -4559,8 +4564,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
       sendRequestsLatch.await()
       verify(blockStoreClient, times(2))
         .finalizeShuffleMerge(any(), any(), any(), any(), any())
-      assert(sentHosts.nonEmpty)
-      assert(sentHosts.head === "hostB" && sentHosts.length == 1)
+      assert(1 == sentHosts.size())
+      assert(sentHosts.asScala.toSeq === Seq("hostB"))
       completeLatch.await()
       assert(hostAInterrupted)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to