This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 471a5b5  [SPARK-37594][SPARK-34399][SQL][TESTS] Make UT more stable
471a5b5 is described below

commit 471a5b55b80256ccd253c93623ff363add5f1985
Author: Angerszhuuuu <angers....@gmail.com>
AuthorDate: Fri Dec 10 10:53:31 2021 -0600

    [SPARK-37594][SPARK-34399][SQL][TESTS] Make UT more stable
    
    ### What changes were proposed in this pull request?
    There are some GA test failed caused by UT ` test("SPARK-34399: Add job 
commit duration metrics for DataWritingCommand") `  such as
    ```
    sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 was 
not greater than 0
        at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
        at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
        at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
        at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$87(SQLMetricsSuite.scala:810)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:305)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:303)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.withTable(SQLMetricsSuite.scala:44)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$86(SQLMetricsSuite.scala:800)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$85(SQLMetricsSuite.scala:800)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at 
org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$5(AdaptiveTestUtils.scala:65)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
        at 
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246)
        at 
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244)
        at 
org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44)
        at 
org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$4(AdaptiveTestUtils.scala:65)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
        at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
        at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
        at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
    ```
    
    This pr to add a certain job commit delay and task commit delay to make it 
more stable.
    ### Why are the changes needed?
    Make unit test more stable.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existed UT
    
    Closes #34847 from AngersZhuuuu/SPARK-37594.
    
    Authored-by: Angerszhuuuu <angers....@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/sql/execution/metric/SQLMetricsSuite.scala       | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 32428fb..a51003e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import scala.reflect.{classTag, ClassTag}
 import scala.util.Random
 
-import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql._
@@ -803,6 +803,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
         val insert = df.queryExecution.executedPlan.collect {
           case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => 
dataWriting.cmd
         }
+        sparkContext.listenerBus.waitUntilEmpty()
         assert(insert.size == 1)
         
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME))
         
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME))
@@ -836,8 +837,15 @@ case class CustomFileCommitProtocol(
     dynamicPartitionOverwrite: Boolean = false)
   extends SQLHadoopMapReduceCommitProtocol(jobId, path, 
dynamicPartitionOverwrite) {
   override def commitTask(
-    taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = {
-    Thread.sleep(Random.nextInt(100))
+      taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = 
{
+    Thread.sleep(100)
     super.commitTask(taskContext)
   }
+
+  override def commitJob(
+      jobContext: JobContext,
+      taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = {
+    Thread.sleep(100)
+    super.commitJob(jobContext, taskCommits)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to