This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 925b2a8 [SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs 925b2a8 is described below commit 925b2a815d609dc18451b3d679b419df3f7689cf Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Mon Dec 7 18:59:15 2020 -0800 [SPARK-33680][SQL][TESTS][FOLLOWUP] Fix more test suites to have explicit confs ### What changes were proposed in this pull request? This is a follow-up for SPARK-33680 to remove the assumption on the default value of `spark.sql.adaptive.enabled` . ### Why are the changes needed? According to the test result https://github.com/apache/spark/pull/30628#issuecomment-739866168, the [previous run](https://github.com/apache/spark/pull/30628#issuecomment-739641105) didn't run all tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #30655 from dongjoon-hyun/SPARK-33680. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit b2a79306ef7b330c5bf4dc1337ed80ebd6e08d0c) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/sql/DataFrameAggregateSuite.scala | 4 +- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 4 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 9 ++- .../apache/spark/sql/execution/PlannerSuite.scala | 73 +++++++++++++++------- .../spark/sql/sources/BucketedReadSuite.scala | 5 +- .../v1/sql/SqlResourceWithActualMetricsSuite.scala | 11 +++- 6 files changed, 74 insertions(+), 32 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index d4e64aa..78983a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1001,7 +1001,9 @@ class DataFrameAggregateSuite extends QueryTest Seq(true, false).foreach { value => test(s"SPARK-31620: agg with subquery (whole-stage-codegen = $value)") { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString) { + withSQLConf( + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value.toString, + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2") { sql("create temporary view t1 as select * from values (1, 2) as t1(a, b)") sql("create temporary view t2 as select * from values (3, 4) as t2(c, d)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 14d03a3..c317f56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -335,7 +335,9 @@ class DataFrameJoinSuite extends QueryTest withTempDatabase { dbName => withTable(table1Name, table2Name) { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.range(50).write.saveAsTable(s"$dbName.$table1Name") spark.range(100).write.saveAsTable(s"$dbName.$table2Name") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 8755dcc..a728e5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1107,6 +1107,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan test("SPARK-32330: Preserve shuffled hash join build side partitioning") { withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { @@ -1130,6 +1131,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan // Test broadcast hash join withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { Seq("inner", "left_outer").foreach(joinType => { val plan = df1.join(df2, $"k1" === $"k2", joinType) @@ -1146,6 +1148,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan // Test shuffled hash join withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { @@ -1253,6 +1256,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan withSQLConf( // Set broadcast join threshold and number of shuffle partitions, // as shuffled hash join depends on these two configs. + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", SQLConf.SHUFFLE_PARTITIONS.key -> "2") { val smjDF = df1.join(df2, joinExprs, "full") @@ -1284,7 +1288,9 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan ) inputDFs.foreach { case (df1, df2, joinType) => // Test broadcast hash join - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val bhjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) assert(bhjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : BroadcastHashJoinExec) => true @@ -1305,6 +1311,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan // Set broadcast join threshold and number of shuffle partitions, // as shuffled hash join depends on these two configs. SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", SQLConf.SHUFFLE_PARTITIONS.key -> "2") { val shjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) assert(shjCodegenDF.queryExecution.executedPlan.collect { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 5e30f84..4e01d1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -877,7 +877,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("aliases in the project should not introduce extra shuffle") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("df1", "df2") { spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") @@ -897,7 +899,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { test("SPARK-33399: aliases should be handled properly in PartitioningCollection output" + " partitioning") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2", "t3") { spark.range(10).repartition($"id").createTempView("t1") spark.range(20).repartition($"id").createTempView("t2") @@ -927,7 +931,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("SPARK-33399: aliases should be handled properly in HashPartitioning") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2", "t3") { spark.range(10).repartition($"id").createTempView("t1") spark.range(20).repartition($"id").createTempView("t2") @@ -955,7 +961,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("SPARK-33399: alias handling should happen properly for RangePartitioning") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val df = spark.range(1, 100) .select(col("id").as("id1")).groupBy("id1").count() // Plan for this will be Range -> ProjectWithAlias -> HashAggregate -> HashAggregate @@ -976,7 +984,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { test("SPARK-33399: aliased should be handled properly " + "for partitioning and sortorder involving complex expressions") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2", "t3") { spark.range(10).select(col("id").as("id1")).createTempView("t1") spark.range(20).select(col("id").as("id2")).createTempView("t2") @@ -1014,7 +1024,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("SPARK-33399: alias handling should happen properly for SinglePartition") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val df = spark.range(1, 100, 1, 1) .select(col("id").as("id1")).groupBy("id1").count() val planned = df.queryExecution.executedPlan @@ -1031,7 +1043,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { test("SPARK-33399: No extra exchanges in case of" + " [Inner Join -> Project with aliases -> HashAggregate]") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2") { spark.range(10).repartition($"id").createTempView("t1") spark.range(20).repartition($"id").createTempView("t2") @@ -1060,7 +1074,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("SPARK-33400: Normalization of sortOrder should take care of sameOrderExprs") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2", "t3") { spark.range(10).repartition($"id").createTempView("t1") spark.range(20).repartition($"id").createTempView("t2") @@ -1091,7 +1107,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("sort order doesn't have repeated expressions") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("t1", "t2") { spark.range(10).repartition($"id").createTempView("t1") spark.range(20).repartition($"id").createTempView("t2") @@ -1117,7 +1135,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("aliases to expressions should not be replaced") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { withTempView("df1", "df2") { spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") @@ -1143,7 +1163,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("aliases in the aggregate expressions should not introduce extra shuffle") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val t1 = spark.range(10).selectExpr("floor(id/4) as k1") val t2 = spark.range(20).selectExpr("floor(id/4) as k2") @@ -1160,7 +1182,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("aliases in the object hash/sort aggregate expressions should not introduce extra shuffle") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { Seq(true, false).foreach { useObjectHashAgg => withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> useObjectHashAgg.toString) { val t1 = spark.range(10).selectExpr("floor(id/4) as k1") @@ -1185,21 +1209,22 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } test("aliases in the sort aggregate expressions should not introduce extra sort") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { - val t1 = spark.range(10).selectExpr("floor(id/4) as k1") - val t2 = spark.range(20).selectExpr("floor(id/4) as k2") + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", + SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + val t1 = spark.range(10).selectExpr("floor(id/4) as k1") + val t2 = spark.range(20).selectExpr("floor(id/4) as k2") - val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3") - val agg2 = t2.groupBy("k2").agg(collect_list("k2")) + val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3") + val agg2 = t2.groupBy("k2").agg(collect_list("k2")) - val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan - assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty) + val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan + assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty) - // We expect two SortExec nodes on each side of join. - val sorts = planned.collect { case s: SortExec => s } - assert(sorts.size == 4) - } + // We expect two SortExec nodes on each side of join. + val sorts = planned.collect { case s: SortExec => s } + assert(sorts.size == 4) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 167e87d..0ff9303 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{FileSourceScanExec, SortExec, SparkPlan} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, DisableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.datasources.BucketingUtils import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -39,7 +39,8 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.util.Utils import org.apache.spark.util.collection.BitSet -class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSparkSession { +class BucketedReadWithoutHiveSupportSuite + extends BucketedReadSuite with DisableAdaptiveExecutionSuite with SharedSparkSession { protected override def beforeAll(): Unit = { super.beforeAll() assert(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala index 0c0e3ac..1510e89 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala @@ -26,7 +26,9 @@ import org.json4s.jackson.JsonMethods import org.apache.spark.SparkConf import org.apache.spark.deploy.history.HistoryServerSuite.getContentAndCode import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils +import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED import org.apache.spark.sql.test.SharedSparkSession case class Person(id: Int, name: String, age: Int) @@ -35,7 +37,8 @@ case class Salary(personId: Int, salary: Double) /** * Sql Resource Public API Unit Tests running query and extracting the metrics. */ -class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils { +class SqlResourceWithActualMetricsSuite + extends SharedSparkSession with SQLMetricsTestUtils with SQLHelper { import testImplicits._ @@ -52,8 +55,10 @@ class SqlResourceWithActualMetricsSuite extends SharedSparkSession with SQLMetri test("Check Sql Rest Api Endpoints") { // Materalize result DataFrame - val count = getDF().count() - assert(count == 2, s"Expected Query Count is 2 but received: $count") + withSQLConf(ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val count = getDF().count() + assert(count == 2, s"Expected Query Count is 2 but received: $count") + } // Spark apps launched by local-mode seems not having `attemptId` as default // so UT is just added for existing endpoints. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org