This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 65530501bb2 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case 65530501bb2 is described below commit 65530501bb23a1e94f5138d9be0ef962bf33de76 Author: Mridul <mridulatgmail.com> AuthorDate: Fri Nov 11 19:58:13 2022 +0900 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case ### What changes were proposed in this pull request? Fix flakey test failure ### Why are the changes needed? MT-safety issue in test ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Local tests. Need to validate on CI Closes #38617 from mridulm/fix-test-failure. Authored-by: Mridul <mridulatgmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f4e67eba40d..17abf3aef4e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.scheduler -import java.util.Properties +import java.util.{ArrayList => JArrayList, Collections => JCollections, Properties} import java.util.concurrent.{CountDownLatch, Delayed, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import scala.annotation.meta.param +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -4533,16 +4534,20 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti blockStoreClientField.setAccessible(true) blockStoreClientField.set(sc.env.blockManager, blockStoreClient) - val sentHosts = ArrayBuffer[String]() + val sentHosts = JCollections.synchronizedList(new JArrayList[String]()) var hostAInterrupted = false doAnswer { (invoke: InvocationOnMock) => val host = invoke.getArgument[String](0) - sendRequestsLatch.countDown() try { if (host == "hostA") { - canSendRequestLatch.await(timeoutSecs * 2, TimeUnit.SECONDS) + sendRequestsLatch.countDown() + canSendRequestLatch.await(timeoutSecs * 5, TimeUnit.SECONDS) + // should not reach here, will get interrupted by DAGScheduler + sentHosts.add(host) + } else { + sentHosts.add(host) + sendRequestsLatch.countDown() } - sentHosts += host } catch { case _: InterruptedException => hostAInterrupted = true } finally { @@ -4559,8 +4564,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti sendRequestsLatch.await() verify(blockStoreClient, times(2)) .finalizeShuffleMerge(any(), any(), any(), any(), any()) - assert(sentHosts.nonEmpty) - assert(sentHosts.head === "hostB" && sentHosts.length == 1) + assert(1 == sentHosts.size()) + assert(sentHosts.asScala.toSeq === Seq("hostB")) completeLatch.await() assert(hostAInterrupted) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org