This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4128d0b9e1a4 [SPARK-50994][SQL][FOLLOWUP] Do not use RDD with tracked 
execution in QueryTest
4128d0b9e1a4 is described below

commit 4128d0b9e1a4477132c505d3bb23ec1aeb275217
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Mar 31 19:04:39 2025 +0800

    [SPARK-50994][SQL][FOLLOWUP] Do not use RDD with tracked execution in 
QueryTest
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/49678 . The 
intention of the test is to verify the deserialization to an RDD. We don't need 
to track the execution and trigger SQL execution events, which makes tests less 
stable.
    
    ### Why are the changes needed?
    
    make tests more stable
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #50457 from cloud-fan/rdd.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala     | 2 +-
 .../main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala | 3 ++-
 sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala           | 2 +-
 .../test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +-
 4 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
index 9652add9cac3..366cc3f4b7d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
@@ -1573,7 +1573,7 @@ class Dataset[T] private[sql](
     sparkSession.sessionState.executePlan(deserialized)
   }
 
-  private lazy val materializedRdd: RDD[T] = {
+  private[sql] lazy val materializedRdd: RDD[T] = {
     val objectType = exprEnc.deserializer.dataType
     rddQueryExecution.toRdd.mapPartitions { rows =>
       rows.map(_.get(0, objectType).asInstanceOf[T])
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 89fc69cd2bdd..9bf567df57c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -98,7 +98,8 @@ object StatFunctions extends Logging {
         sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
       sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) 
}
     }
-    val summaries = df.select(columns: 
_*).rdd.treeAggregate(emptySummaries)(apply, merge)
+    val summaries = df.select(columns: _*).materializedRdd
+      .treeAggregate(emptySummaries)(apply, merge)
 
     summaries.map {
       summary => summary.query(probabilities) match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 42b3b12efe39..d2d119d1f581 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -288,7 +288,7 @@ object QueryTest extends Assertions {
     val isSorted = df.logicalPlan.collect { case s: logical.Sort => s 
}.nonEmpty
     if (checkToRDD) {
       SQLExecution.withSQLConfPropagated(df.sparkSession) {
-        df.rdd.count() // Also attempt to deserialize as an RDD [SPARK-15791]
+        df.materializedRdd.count() // Also attempt to deserialize as an RDD 
[SPARK-15791]
       }
     }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index fb86ebf74f5f..78d30866b4f2 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2707,7 +2707,7 @@ class SQLQuerySuite extends SQLQuerySuiteBase with 
DisableAdaptiveExecutionSuite
                     checkAnswer(sql(s"SELECT id FROM $targetTable"),
                       Row(1) :: Row(2) :: Row(3) :: Nil)
                     spark.sparkContext.listenerBus.waitUntilEmpty()
-                    assert(commands.size == 4)
+                    assert(commands.size == 3)
                     assert(commands.head.nodeName == "Execute 
CreateHiveTableAsSelectCommand")
 
                     val v1WriteCommand = commands(1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to