This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 0856ab433d5c [SPARK-52741][SQL] RemoveFiles ShuffleCleanup mode doesnt 
work with non-adaptive execution
0856ab433d5c is described below

commit 0856ab433d5cc1f7abba841dbc3ddbf4f46d4151
Author: Karuppayya Rajendran <karuppayya.rajend...@salesforce.com>
AuthorDate: Fri Aug 15 16:55:02 2025 +0800

    [SPARK-52741][SQL] RemoveFiles ShuffleCleanup mode doesnt work with 
non-adaptive execution
    
    ### What changes were proposed in this pull request?
    Currently, shuffle cleanup only works for adaptive execution plans. 
Non-adaptive execution plans are not cleaned up. Thing change cleans it.
    
    ### Why are the changes needed?
    - To cleanup shuffle files of non-adaptive query executions
    - Consistency in behavior between adaptive and non-adaptive shuffle cleanup 
based on the cleanup mode
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Modified existing unit tests to cover this case
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #51432 from karuppayya/SPARK-52741.
    
    Lead-authored-by: Karuppayya Rajendran <karuppayya.rajend...@salesforce.com>
    Co-authored-by: Karuppayya Rajendran <karuppayya1...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 337a67ff58ad91f2f86751bcb9a5e50e1de5cef2)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/SQLExecution.scala  |  8 ++-
 .../spark/sql/execution/QueryExecutionSuite.scala  | 63 +++++++++++++---------
 2 files changed, 45 insertions(+), 26 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 9dcb38f8ff10..c5c2f9bb6a6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, 
SPARK_EXECUTOR_PRE
 import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, 
SparkListenerSQLExecutionStart}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH
@@ -178,8 +179,11 @@ object SQLExecution extends Logging {
                 val shuffleIds = queryExecution.executedPlan match {
                   case ae: AdaptiveSparkPlanExec =>
                     ae.context.shuffleIds.asScala.keys
-                  case _ =>
-                    Iterable.empty
+                  case nonAdaptivePlan =>
+                    nonAdaptivePlan.collect {
+                      case exec: ShuffleExchangeLike =>
+                        exec.shuffleId
+                    }
                 }
                 shuffleIds.foreach { shuffleId =>
                   queryExecution.shuffleCleanupMode match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index a64902437086..47d5ff67b840 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -327,36 +327,51 @@ class QueryExecutionSuite extends SharedSparkSession {
   }
 
   test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") {
-    val plan = spark.range(100).repartition(10).logicalPlan
-    val df = Dataset.ofRows(spark, plan, DoNotCleanup)
-    df.collect()
-
-    val blockManager = spark.sparkContext.env.blockManager
-    assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty)
-    assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
-    cleanupShuffles()
+    Seq(true, false).foreach { adaptiveEnabled => {
+      withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
adaptiveEnabled.toString)) {
+        val plan = spark.range(100).repartition(10).logicalPlan
+        val df = Dataset.ofRows(spark, plan, DoNotCleanup)
+        df.collect()
+
+        val blockManager = spark.sparkContext.env.blockManager
+        assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty)
+        assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
+        cleanupShuffles()
+        }
+      }
+    }
   }
 
   test("SPARK-47764: Cleanup shuffle dependencies - SkipMigration mode") {
-    val plan = spark.range(100).repartition(10).logicalPlan
-    val df = Dataset.ofRows(spark, plan, SkipMigration)
-    df.collect()
-
-    val blockManager = spark.sparkContext.env.blockManager
-    assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
-    assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
-    cleanupShuffles()
+    Seq(true, false).foreach { adaptiveEnabled => {
+      withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
adaptiveEnabled.toString)) {
+        val plan = spark.range(100).repartition(10).logicalPlan
+        val df = Dataset.ofRows(spark, plan, SkipMigration)
+        df.collect()
+
+        val blockManager = spark.sparkContext.env.blockManager
+        assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
+        assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty)
+        cleanupShuffles()
+        }
+      }
+    }
   }
 
   test("SPARK-47764: Cleanup shuffle dependencies - RemoveShuffleFiles mode") {
-    val plan = spark.range(100).repartition(10).logicalPlan
-    val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles)
-    df.collect()
-
-    val blockManager = spark.sparkContext.env.blockManager
-    assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
-    assert(blockManager.diskBlockManager.getAllBlocks().isEmpty)
-    cleanupShuffles()
+    Seq(true, false).foreach { adaptiveEnabled => {
+      withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
adaptiveEnabled.toString)) {
+        val plan = spark.range(100).repartition(10).logicalPlan
+        val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles)
+        df.collect()
+
+        val blockManager = spark.sparkContext.env.blockManager
+        assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
+        assert(blockManager.diskBlockManager.getAllBlocks().isEmpty)
+        cleanupShuffles()
+        }
+      }
+    }
   }
 
   test("SPARK-35378: Return UnsafeRow in CommandResultExecCheck execute 
methods") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to