This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e1f5a7c856ab [SPARK-48477][SQL][TESTS] Use withSQLConf in tests: 
Refactor CollationSuite, CoalesceShufflePartitionsSuite, SQLExecutionSuite
e1f5a7c856ab is described below

commit e1f5a7c856ab7ed4bf055553e490ee7c1307775a
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Thu May 30 14:07:10 2024 -0700

    [SPARK-48477][SQL][TESTS] Use withSQLConf in tests: Refactor 
CollationSuite, CoalesceShufflePartitionsSuite, SQLExecutionSuite
    
    ### What changes were proposed in this pull request?
    
    Use withSQLConf in tests when it is appropriate.
    
    ### Why are the changes needed?
    
    Enforce good practice for setting config in test cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    Existing UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    NO
    
    Closes #46812 from amaliujia/sql_config_4.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/CollationSuite.scala      |  16 +--
 .../execution/CoalesceShufflePartitionsSuite.scala | 128 +++++++++++----------
 .../spark/sql/execution/SQLExecutionSuite.scala    |   9 +-
 3 files changed, 78 insertions(+), 75 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index 9b3bfe1c77b3..42da779b84ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -677,14 +677,14 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
       sql(s"INSERT INTO $tableName VALUES ('bbb', 'bbb')")
       sql(s"INSERT INTO $tableName VALUES ('BBB', 'BBB')")
 
-      sql(s"SET spark.sql.legacy.createHiveTableByDefault=false")
-
-      withTable(newTableName) {
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql(s"CREATE TABLE $newTableName AS SELECT c1 || c2 FROM 
$tableName")
-          },
-          errorClass = "COLLATION_MISMATCH.IMPLICIT")
+      withSQLConf("spark.sql.legacy.createHiveTableByDefault" -> "false") {
+        withTable(newTableName) {
+          checkError(
+            exception = intercept[AnalysisException] {
+              sql(s"CREATE TABLE $newTableName AS SELECT c1 || c2 FROM 
$tableName")
+            },
+            errorClass = "COLLATION_MISMATCH.IMPLICIT")
+        }
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index e87b90dfdd84..dc72b4a092ae 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -28,7 +29,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.ArrayImplicits._
 
-class CoalesceShufflePartitionsSuite extends SparkFunSuite {
+class CoalesceShufflePartitionsSuite extends SparkFunSuite with SQLConfHelper {
 
   private var originalActiveSparkSession: Option[SparkSession] = _
   private var originalInstantiatedSparkSession: Option[SparkSession] = _
@@ -374,72 +375,73 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite {
 
   test("SPARK-24705 adaptive query execution works correctly when exchange 
reuse enabled") {
     val test: SparkSession => Unit = { spark: SparkSession =>
-      spark.sql("SET spark.sql.exchange.reuse=true")
-      val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value")
-
-      // test case 1: a query stage has 3 child stages but they are the same 
stage.
-      // Final Stage 1
-      //   ShuffleQueryStage 0
-      //   ReusedQueryStage 0
-      //   ReusedQueryStage 0
-      val resultDf = df.join(df, "key").join(df, "key")
-      QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i)))
-      val finalPlan = resultDf.queryExecution.executedPlan
-        .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-      assert(finalPlan.collect {
-        case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r
-      }.length == 2)
-      assert(
-        finalPlan.collect {
-          case r @ CoalescedShuffleRead() => r
-        }.length == 3)
-
-
-      // test case 2: a query stage has 2 parent stages.
-      // Final Stage 3
-      //   ShuffleQueryStage 1
-      //     ShuffleQueryStage 0
-      //   ShuffleQueryStage 2
-      //     ReusedQueryStage 0
-      val grouped = df.groupBy((col("key") + 
1).as("key")).agg(max("value").as("value"))
-      val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
-        .union(grouped.groupBy(col("key") + 2).max("value"))
-      QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) :: 
Row(4, 1) ::
-        Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: 
Row(7, 4) :: Row(7, 5) ::
-        Row(8, 5) :: Nil)
-
-      val finalPlan2 = resultDf2.queryExecution.executedPlan
-        .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
-
-      // The result stage has 2 children
-      val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
-      assert(level1Stages.length == 2)
-
-      assert(
-        finalPlan2.collect {
-          case r @ CoalescedShuffleRead() => r
-        }.length == 2, "finalPlan2")
+      withSQLConf("spark.sql.exchange.reuse" -> "true") {
+        val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value")
+
+        // test case 1: a query stage has 3 child stages but they are the same 
stage.
+        // Final Stage 1
+        //   ShuffleQueryStage 0
+        //   ReusedQueryStage 0
+        //   ReusedQueryStage 0
+        val resultDf = df.join(df, "key").join(df, "key")
+        QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i)))
+        val finalPlan = resultDf.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+        assert(finalPlan.collect {
+          case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r
+        }.length == 2)
+        assert(
+          finalPlan.collect {
+            case r@CoalescedShuffleRead() => r
+          }.length == 3)
+
+
+        // test case 2: a query stage has 2 parent stages.
+        // Final Stage 3
+        //   ShuffleQueryStage 1
+        //     ShuffleQueryStage 0
+        //   ShuffleQueryStage 2
+        //     ReusedQueryStage 0
+        val grouped = df.groupBy((col("key") + 
1).as("key")).agg(max("value").as("value"))
+        val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
+          .union(grouped.groupBy(col("key") + 2).max("value"))
+        QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) 
:: Row(4, 1) ::
+          Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: 
Row(7, 4) :: Row(7, 5) ::
+          Row(8, 5) :: Nil)
+
+        val finalPlan2 = resultDf2.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
 
-      level1Stages.foreach(qs =>
-        assert(qs.plan.collect {
-          case r @ CoalescedShuffleRead() => r
-        }.length == 1,
-          "Wrong CoalescedShuffleRead below " + qs.simpleString(3)))
-
-      val leafStages = level1Stages.flatMap { stage =>
-        // All of the child stages of result stage have only one child stage.
-        val children = stage.plan.collect { case q: QueryStageExec => q }
-        assert(children.length == 1)
-        children
-      }
-      assert(leafStages.length == 2)
+        // The result stage has 2 children
+        val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
+        assert(level1Stages.length == 2)
+
+        assert(
+          finalPlan2.collect {
+            case r@CoalescedShuffleRead() => r
+          }.length == 2, "finalPlan2")
+
+        level1Stages.foreach(qs =>
+          assert(qs.plan.collect {
+            case r@CoalescedShuffleRead() => r
+          }.length == 1,
+            "Wrong CoalescedShuffleRead below " + qs.simpleString(3)))
+
+        val leafStages = level1Stages.flatMap { stage =>
+          // All of the child stages of result stage have only one child stage.
+          val children = stage.plan.collect { case q: QueryStageExec => q }
+          assert(children.length == 1)
+          children
+        }
+        assert(leafStages.length == 2)
 
-      val reusedStages = level1Stages.flatMap { stage =>
-        stage.plan.collect {
-          case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r
+        val reusedStages = level1Stages.flatMap { stage =>
+          stage.plan.collect {
+            case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r
+          }
         }
+        assert(reusedStages.length == 1)
       }
-      assert(reusedStages.length == 1)
     }
     withSparkSession(test, 400, None)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 48860f381efa..b8a109919f8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -29,12 +29,13 @@ import org.apache.spark.{SparkConf, SparkContext, 
SparkFunSuite}
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
 import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
 import org.apache.spark.sql.types._
 import org.apache.spark.util.ThreadUtils
 import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT
 
-class SQLExecutionSuite extends SparkFunSuite {
+class SQLExecutionSuite extends SparkFunSuite with SQLConfHelper {
 
   test("concurrent query execution (SPARK-10548)") {
     val conf = new SparkConf()
@@ -194,9 +195,9 @@ class SQLExecutionSuite extends SparkFunSuite {
           
start.physicalPlanDescription.toLowerCase(Locale.ROOT).contains("project")
       })
       spark.sql("SELECT 1").collect()
-      spark.sql("SET k2 = v2")
-      spark.sql("SET redaction.password = 123")
-      spark.sql("SELECT 1").collect()
+      withSQLConf("k2" -> "v2", "redaction.password" -> "123") {
+        spark.sql("SELECT 1").collect()
+      }
       spark.sparkContext.listenerBus.waitUntilEmpty()
       assert(index.get() == 2)
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to