This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8cd3c1a9c1c [SPARK-45568][TESTS] Fix flaky 
WholeStageCodegenSparkSubmitSuite
8cd3c1a9c1c is described below

commit 8cd3c1a9c1c336155fe09728171aba84ef55ef2d
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Oct 17 22:19:18 2023 +0800

    [SPARK-45568][TESTS] Fix flaky WholeStageCodegenSparkSubmitSuite
    
    ### What changes were proposed in this pull request?
    
    WholeStageCodegenSparkSubmitSuite is 
[flaky](https://github.com/apache/spark/actions/runs/6479534195/job/17593342589)
 because SHUFFLE_PARTITIONS(200) creates 200 reducers for one total core and 
improper stop progress causes executor launcher reties. The heavy load and 
reties might result in timeout test failures.
    
    ### Why are the changes needed?
    
    CI robustness
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing WholeStageCodegenSparkSubmitSuite
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #43394 from yaooqinn/SPARK-45568.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit f00ec39542a5f9ac75d8c24f0f04a7be703c8d7c)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../WholeStageCodegenSparkSubmitSuite.scala        | 57 ++++++++++++----------
 1 file changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
index 73c4e4c3e1e..06ba8fb772a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkSubmitTestUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, Row, SparkSession}
 import org.apache.spark.sql.functions.{array, col, count, lit}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.ResetSystemProperties
@@ -68,39 +69,41 @@ class WholeStageCodegenSparkSubmitSuite extends 
SparkSubmitTestUtils
 
 object WholeStageCodegenSparkSubmitSuite extends Assertions with Logging {
 
-  var spark: SparkSession = _
-
   def main(args: Array[String]): Unit = {
     TestUtils.configTestLog4j2("INFO")
 
-    spark = SparkSession.builder().getOrCreate()
+    val spark = SparkSession.builder()
+      .config(SQLConf.SHUFFLE_PARTITIONS.key, "2")
+      .getOrCreate()
+
+    try {
+      // Make sure the test is run where the driver and the executors uses 
different object layouts
+      val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET
+      val executorArrayHeaderSize =
+        spark.sparkContext.range(0, 1).map(_ => 
Platform.BYTE_ARRAY_OFFSET).collect().head
+      assert(driverArrayHeaderSize > executorArrayHeaderSize)
 
-    // Make sure the test is run where the driver and the executors uses 
different object layouts
-    val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET
-    val executorArrayHeaderSize =
-      spark.sparkContext.range(0, 1).map(_ => 
Platform.BYTE_ARRAY_OFFSET).collect.head.toInt
-    assert(driverArrayHeaderSize > executorArrayHeaderSize)
+      val df = spark.range(71773).select((col("id") % 
lit(10)).cast(IntegerType) as "v")
+        .groupBy(array(col("v"))).agg(count(col("*")))
+      val plan = df.queryExecution.executedPlan
+      assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
 
-    val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) 
as "v")
-      .groupBy(array(col("v"))).agg(count(col("*")))
-    val plan = df.queryExecution.executedPlan
-    assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]))
+      val expectedAnswer =
+        Row(Array(0), 7178) ::
+          Row(Array(1), 7178) ::
+          Row(Array(2), 7178) ::
+          Row(Array(3), 7177) ::
+          Row(Array(4), 7177) ::
+          Row(Array(5), 7177) ::
+          Row(Array(6), 7177) ::
+          Row(Array(7), 7177) ::
+          Row(Array(8), 7177) ::
+          Row(Array(9), 7177) :: Nil
 
-    val expectedAnswer =
-      Row(Array(0), 7178) ::
-        Row(Array(1), 7178) ::
-        Row(Array(2), 7178) ::
-        Row(Array(3), 7177) ::
-        Row(Array(4), 7177) ::
-        Row(Array(5), 7177) ::
-        Row(Array(6), 7177) ::
-        Row(Array(7), 7177) ::
-        Row(Array(8), 7177) ::
-        Row(Array(9), 7177) :: Nil
-    val result = df.collect
-    QueryTest.sameRows(result.toSeq, expectedAnswer) match {
-      case Some(errMsg) => fail(errMsg)
-      case _ =>
+      QueryTest.checkAnswer(df, expectedAnswer)
+    } finally {
+      spark.stop()
     }
+
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to