This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e1f5a7c856ab [SPARK-48477][SQL][TESTS] Use withSQLConf in tests: Refactor CollationSuite, CoalesceShufflePartitionsSuite, SQLExecutionSuite e1f5a7c856ab is described below commit e1f5a7c856ab7ed4bf055553e490ee7c1307775a Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu May 30 14:07:10 2024 -0700 [SPARK-48477][SQL][TESTS] Use withSQLConf in tests: Refactor CollationSuite, CoalesceShufflePartitionsSuite, SQLExecutionSuite ### What changes were proposed in this pull request? Use withSQLConf in tests when it is appropriate. ### Why are the changes needed? Enforce good practice for setting config in test cases. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT ### Was this patch authored or co-authored using generative AI tooling? NO Closes #46812 from amaliujia/sql_config_4. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/CollationSuite.scala | 16 +-- .../execution/CoalesceShufflePartitionsSuite.scala | 128 +++++++++++---------- .../spark/sql/execution/SQLExecutionSuite.scala | 9 +- 3 files changed, 78 insertions(+), 75 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index 9b3bfe1c77b3..42da779b84ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -677,14 +677,14 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { sql(s"INSERT INTO $tableName VALUES ('bbb', 'bbb')") sql(s"INSERT INTO $tableName VALUES ('BBB', 'BBB')") - sql(s"SET spark.sql.legacy.createHiveTableByDefault=false") - - withTable(newTableName) { - checkError( - exception = intercept[AnalysisException] { - sql(s"CREATE TABLE $newTableName AS SELECT c1 || c2 FROM $tableName") - }, - errorClass = "COLLATION_MISMATCH.IMPLICIT") + withSQLConf("spark.sql.legacy.createHiveTableByDefault" -> "false") { + withTable(newTableName) { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE $newTableName AS SELECT c1 || c2 FROM $tableName") + }, + errorClass = "COLLATION_MISMATCH.IMPLICIT") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index e87b90dfdd84..dc72b4a092ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec @@ -28,7 +29,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ArrayImplicits._ -class CoalesceShufflePartitionsSuite extends SparkFunSuite { +class CoalesceShufflePartitionsSuite extends SparkFunSuite with SQLConfHelper { private var originalActiveSparkSession: Option[SparkSession] = _ private var originalInstantiatedSparkSession: Option[SparkSession] = _ @@ -374,72 +375,73 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite { test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { val test: SparkSession => Unit = { spark: SparkSession => - spark.sql("SET spark.sql.exchange.reuse=true") - val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value") - - // test case 1: a query stage has 3 child stages but they are the same stage. - // Final Stage 1 - // ShuffleQueryStage 0 - // ReusedQueryStage 0 - // ReusedQueryStage 0 - val resultDf = df.join(df, "key").join(df, "key") - QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i))) - val finalPlan = resultDf.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - assert(finalPlan.collect { - case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r - }.length == 2) - assert( - finalPlan.collect { - case r @ CoalescedShuffleRead() => r - }.length == 3) - - - // test case 2: a query stage has 2 parent stages. - // Final Stage 3 - // ShuffleQueryStage 1 - // ShuffleQueryStage 0 - // ShuffleQueryStage 2 - // ReusedQueryStage 0 - val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value")) - val resultDf2 = grouped.groupBy(col("key") + 1).max("value") - .union(grouped.groupBy(col("key") + 2).max("value")) - QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) :: Row(4, 1) :: - Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: Row(7, 4) :: Row(7, 5) :: - Row(8, 5) :: Nil) - - val finalPlan2 = resultDf2.queryExecution.executedPlan - .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - - // The result stage has 2 children - val level1Stages = finalPlan2.collect { case q: QueryStageExec => q } - assert(level1Stages.length == 2) - - assert( - finalPlan2.collect { - case r @ CoalescedShuffleRead() => r - }.length == 2, "finalPlan2") + withSQLConf("spark.sql.exchange.reuse" -> "true") { + val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value") + + // test case 1: a query stage has 3 child stages but they are the same stage. + // Final Stage 1 + // ShuffleQueryStage 0 + // ReusedQueryStage 0 + // ReusedQueryStage 0 + val resultDf = df.join(df, "key").join(df, "key") + QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i))) + val finalPlan = resultDf.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert(finalPlan.collect { + case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r + }.length == 2) + assert( + finalPlan.collect { + case r@CoalescedShuffleRead() => r + }.length == 3) + + + // test case 2: a query stage has 2 parent stages. + // Final Stage 3 + // ShuffleQueryStage 1 + // ShuffleQueryStage 0 + // ShuffleQueryStage 2 + // ReusedQueryStage 0 + val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value")) + val resultDf2 = grouped.groupBy(col("key") + 1).max("value") + .union(grouped.groupBy(col("key") + 2).max("value")) + QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) :: Row(4, 1) :: + Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: Row(7, 4) :: Row(7, 5) :: + Row(8, 5) :: Nil) + + val finalPlan2 = resultDf2.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan - level1Stages.foreach(qs => - assert(qs.plan.collect { - case r @ CoalescedShuffleRead() => r - }.length == 1, - "Wrong CoalescedShuffleRead below " + qs.simpleString(3))) - - val leafStages = level1Stages.flatMap { stage => - // All of the child stages of result stage have only one child stage. - val children = stage.plan.collect { case q: QueryStageExec => q } - assert(children.length == 1) - children - } - assert(leafStages.length == 2) + // The result stage has 2 children + val level1Stages = finalPlan2.collect { case q: QueryStageExec => q } + assert(level1Stages.length == 2) + + assert( + finalPlan2.collect { + case r@CoalescedShuffleRead() => r + }.length == 2, "finalPlan2") + + level1Stages.foreach(qs => + assert(qs.plan.collect { + case r@CoalescedShuffleRead() => r + }.length == 1, + "Wrong CoalescedShuffleRead below " + qs.simpleString(3))) + + val leafStages = level1Stages.flatMap { stage => + // All of the child stages of result stage have only one child stage. + val children = stage.plan.collect { case q: QueryStageExec => q } + assert(children.length == 1) + children + } + assert(leafStages.length == 2) - val reusedStages = level1Stages.flatMap { stage => - stage.plan.collect { - case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r + val reusedStages = level1Stages.flatMap { stage => + stage.plan.collect { + case ShuffleQueryStageExec(_, r: ReusedExchangeExec, _) => r + } } + assert(reusedStages.length == 1) } - assert(reusedStages.length == 1) } withSparkSession(test, 400, None) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index 48860f381efa..b8a109919f8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -29,12 +29,13 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart import org.apache.spark.sql.types._ import org.apache.spark.util.ThreadUtils import org.apache.spark.util.Utils.REDACTION_REPLACEMENT_TEXT -class SQLExecutionSuite extends SparkFunSuite { +class SQLExecutionSuite extends SparkFunSuite with SQLConfHelper { test("concurrent query execution (SPARK-10548)") { val conf = new SparkConf() @@ -194,9 +195,9 @@ class SQLExecutionSuite extends SparkFunSuite { start.physicalPlanDescription.toLowerCase(Locale.ROOT).contains("project") }) spark.sql("SELECT 1").collect() - spark.sql("SET k2 = v2") - spark.sql("SET redaction.password = 123") - spark.sql("SELECT 1").collect() + withSQLConf("k2" -> "v2", "redaction.password" -> "123") { + spark.sql("SELECT 1").collect() + } spark.sparkContext.listenerBus.waitUntilEmpty() assert(index.get() == 2) } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org