This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 44f30a04dad [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite 44f30a04dad is described below commit 44f30a04dad2baa471b505f95c6a29992ee7ca72 Author: Kazuyuki Tanimura <ktanim...@apple.com> AuthorDate: Wed Aug 17 15:32:46 2022 -0700 [SPARK-40110][SQL][TESTS] Add JDBCWithAQESuite ### What changes were proposed in this pull request? This PR proposes to add `JDBCWithAQESuite` i.e. test cases of `JDBCSuite` with AQE (Adaptive Query Execution) enabled. ### Why are the changes needed? Currently `JDBCSuite` assumes that AQE is always turned off. We should also test with AQE turned on ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added the AQE version tests along with the non AQE version Closes #37544 from kazuyukitanimura/SPARK-40110. Authored-by: Kazuyuki Tanimura <ktanim...@apple.com> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 32 ++++++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b87fee6cec2..8eda0c288a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils} @@ -44,7 +45,8 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -class JDBCSuite extends QueryTest with SharedSparkSession { +class JDBCSuite extends QueryTest with SharedSparkSession + with AdaptiveSparkPlanHelper with DisableAdaptiveExecutionSuite { import testImplicits._ val url = "jdbc:h2:mem:testdb0" @@ -298,10 +300,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) - val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) - assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) + val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) { + assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec]) + parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + } else { + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child + } + assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) + assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) df } @@ -309,9 +316,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession { val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD // cannot compile given predicates. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) - val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) + val child = if (df.sqlContext.conf.adaptiveExecutionEnabled) { + assert(parentPlan.isInstanceOf[AdaptiveSparkPlanExec]) + parentPlan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + } else { + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec].child + } + assert(child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) df } @@ -1767,7 +1779,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { def getRowCount(df: DataFrame): Long = { val queryExecution = df.queryExecution - val rawPlan = queryExecution.executedPlan.collect { + val rawPlan = collect(queryExecution.executedPlan) { case p: DataSourceScanExec => p } match { case Seq(p) => p @@ -1964,3 +1976,5 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } } } + +class JDBCWithAQESuite extends JDBCSuite with EnableAdaptiveExecutionSuite --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org