This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4128d0b9e1a4 [SPARK-50994][SQL][FOLLOWUP] Do not use RDD with tracked execution in QueryTest 4128d0b9e1a4 is described below commit 4128d0b9e1a4477132c505d3bb23ec1aeb275217 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Mar 31 19:04:39 2025 +0800 [SPARK-50994][SQL][FOLLOWUP] Do not use RDD with tracked execution in QueryTest ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/49678 . The intention of the test is to verify the deserialization to an RDD. We don't need to track the execution and trigger SQL execution events, which makes tests less stable. ### Why are the changes needed? make tests more stable ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? no Closes #50457 from cloud-fan/rdd. Lead-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala | 3 ++- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 2 +- .../test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index 9652add9cac3..366cc3f4b7d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -1573,7 +1573,7 @@ class Dataset[T] private[sql]( sparkSession.sessionState.executePlan(deserialized) } - private lazy val materializedRdd: RDD[T] = { + private[sql] lazy val materializedRdd: RDD[T] = { val objectType = exprEnc.deserializer.dataType rddQueryExecution.toRdd.mapPartitions { rows => rows.map(_.get(0, objectType).asInstanceOf[T]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 89fc69cd2bdd..9bf567df57c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -98,7 +98,8 @@ object StatFunctions extends Logging { sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) } } - val summaries = df.select(columns: _*).rdd.treeAggregate(emptySummaries)(apply, merge) + val summaries = df.select(columns: _*).materializedRdd + .treeAggregate(emptySummaries)(apply, merge) summaries.map { summary => summary.query(probabilities) match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 42b3b12efe39..d2d119d1f581 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -288,7 +288,7 @@ object QueryTest extends Assertions { val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty if (checkToRDD) { SQLExecution.withSQLConfPropagated(df.sparkSession) { - df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791] + df.materializedRdd.count() // Also attempt to deserialize as an RDD [SPARK-15791] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index fb86ebf74f5f..78d30866b4f2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2707,7 +2707,7 @@ class SQLQuerySuite extends SQLQuerySuiteBase with DisableAdaptiveExecutionSuite checkAnswer(sql(s"SELECT id FROM $targetTable"), Row(1) :: Row(2) :: Row(3) :: Nil) spark.sparkContext.listenerBus.waitUntilEmpty() - assert(commands.size == 4) + assert(commands.size == 3) assert(commands.head.nodeName == "Execute CreateHiveTableAsSelectCommand") val v1WriteCommand = commands(1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org