This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c0911981887 [SPARK-41914][SQL] FileFormatWriter materializes AQE plan 
before accessing outputOrdering
c0911981887 is described below

commit c091198188789afb1282bc76419cf6e1397b0bc9
Author: Enrico Minack <git...@enrico.minack.dev>
AuthorDate: Tue Jan 10 13:10:07 2023 +0800

    [SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing 
outputOrdering
    
    ### What changes were proposed in this pull request?
    The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing 
the plan's `outputOrdering`. This is required when planned writing is disabled 
(`spark.sql.optimizer.plannedWrite.enabled` is `true` by default). With planned 
writing enabled `FileFormatWriter` gets the final plan already.
    
    ### Why are the changes needed?
    `FileFormatWriter` enforces an ordering if the written plan does not 
provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering, 
in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) 
even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). 
In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see 
SPARK-40588).
    
    ### Does this PR introduce _any_ user-facing change?
    This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores 
behaviour from Spark 2.4.
    
    ### How was this patch tested?
    The final plan that is written to files is now stored in 
`FileFormatWriter.executedPlan` (similar to existing 
`FileFormatWriter.outputOrderingMatched`). Unit tests assert the outermost sort 
order written to files.
    
    The actual plan written into the files changed from (taken from 
`"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition 
column"`):
    
    ```
    Sort [input[2, int, false] ASC NULLS FIRST], false, 0
    +- *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
       +- *(3) Project [b#24, value#14, key#13]
          +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
             :- BroadcastQueryStage 2
             :  +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=376]
             :     +- AQEShuffleRead local
             :        +- ShuffleQueryStage 0
             :           +- Exchange hashpartitioning(key#13, 5), 
ENSURE_REQUIREMENTS, [plan_id=328]
             :              +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#14]
             :                 +- Scan[obj#12]
             +- AQEShuffleRead local
                +- ShuffleQueryStage 1
                   +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, 
[plan_id=345]
                      +- *(2) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, 
knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                         +- Scan[obj#22]
    ```
    
    where `FileFormatWriter` enforces order with `Sort [input[2, int, false] 
ASC NULLS FIRST], false, 0`, to
    
    ```
    *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0
    +- *(3) Project [b#24, value#14, key#13]
       +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
          :- BroadcastQueryStage 2
          :  +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), 
[plan_id=375]
          :     +- AQEShuffleRead local
          :        +- ShuffleQueryStage 0
          :           +- Exchange hashpartitioning(key#13, 5), 
ENSURE_REQUIREMENTS, [plan_id=327]
          :              +- *(1) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, 
true) AS value#14]
          :                 +- Scan[obj#12]
          +- AQEShuffleRead local
             +- ShuffleQueryStage 1
                +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, 
[plan_id=344]
                   +- *(2) SerializeFromObject 
[knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, 
knownnotnull(assertnotnull(input[0, 
org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                      +- Scan[obj#22]
    ```
    
    where the sort given by the user is the outermost sort now.
    
    Closes #39431 from EnricoMi/branch-materialize-aqe-plan.
    
    Authored-by: Enrico Minack <git...@enrico.minack.dev>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   2 +
 .../execution/datasources/FileFormatWriter.scala   |  36 ++++++-
 .../datasources/V1WriteCommandSuite.scala          | 115 +++++++++++++++++++--
 3 files changed, 138 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 62a75e75345..395e5468b64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -225,6 +225,8 @@ case class AdaptiveSparkPlanExec(
       .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
   }
 
+  def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
+
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
     if (isFinalPlan) return currentPhysicalPlan
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 8ce87d6fbe1..6285095c647 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -39,6 +39,7 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, 
SQLExecution, UnsafeExternalRowSorter}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -62,6 +63,11 @@ object FileFormatWriter extends Logging {
    */
   private[sql] var outputOrderingMatched: Boolean = false
 
+  /**
+   * A variable used in tests to check the final executed plan.
+   */
+  private[sql] var executedPlan: Option[SparkPlan] = None
+
   // scalastyle:off argcount
   /**
    * Basic work flow of this command is:
@@ -138,9 +144,21 @@ object FileFormatWriter extends Logging {
     val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
         writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
     val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)
+
+    // SPARK-40588: when planned writing is disabled and AQE is enabled,
+    // plan contains an AdaptiveSparkPlanExec, which does not know
+    // its final plan's ordering, so we have to materialize that plan first
+    // it is fine to use plan further down as the final plan is cached in that 
plan
+    def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
+      case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+      case p: SparkPlan => 
p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
+    }
+
     // the sort order doesn't matter
     // Use the output ordering from the original plan before adding the 
empty2null projection.
-    val actualOrdering = 
writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child)
+    val actualOrdering = writeFilesOpt.map(_.child)
+      .getOrElse(materializeAdaptiveSparkPlan(plan))
+      .outputOrdering.map(_.child)
     val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, 
actualOrdering)
 
     SQLExecution.checkSQLExecutionId(sparkSession)
@@ -198,19 +216,24 @@ object FileFormatWriter extends Logging {
     }
 
     writeAndCommit(job, description, committer) {
-      val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
-        (empty2NullPlan.execute(), None)
+      val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
+        (empty2NullPlan, None)
       } else {
         val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, 
outputSpec)
         val concurrentOutputWriterSpec = createConcurrentOutputWriterSpec(
           sparkSession, sortPlan, sortColumns)
         if (concurrentOutputWriterSpec.isDefined) {
-          (empty2NullPlan.execute(), concurrentOutputWriterSpec)
+          (empty2NullPlan, concurrentOutputWriterSpec)
         } else {
-          (sortPlan.execute(), concurrentOutputWriterSpec)
+          (sortPlan, concurrentOutputWriterSpec)
         }
       }
 
+      // In testing, this is the only way to get hold of the actually executed 
plan written to file
+      if (Utils.isTesting) executedPlan = Some(planToExecute)
+
+      val rdd = planToExecute.execute()
+
       // SPARK-23271 If we are attempting to write a zero partition rdd, 
create a dummy single
       // partition rdd to make sure we at least set up one write task to write 
the metadata.
       val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
@@ -281,6 +304,9 @@ object FileFormatWriter extends Logging {
     val committer = writeFilesSpec.committer
     val description = writeFilesSpec.description
 
+    // In testing, this is the only way to get hold of the actually executed 
plan written to file
+    if (Utils.isTesting) executedPlan = Some(planForWrites)
+
     writeAndCommit(job, description, committer) {
       val rdd = planForWrites.executeWrite(writeFilesSpec)
       val ret = new Array[WriteTaskResult](rdd.partitions.length)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index e9c5c77e6d9..80d0369044c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -18,10 +18,13 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, 
AttributeReference, NullsFirst, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, SortExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
+import org.apache.spark.sql.types.{IntegerType, StringType}
 import org.apache.spark.sql.util.QueryExecutionListener
 
 trait V1WriteCommandSuiteBase extends SQLTestUtils {
@@ -52,8 +55,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
   }
 
   /**
-   * Execute a write query and check ordering of the plan. Return the 
optimized logical write
-   * query plan.
+   * Execute a write query and check ordering of the plan.
    */
   protected def executeAndCheckOrdering(
       hasLogicalSort: Boolean,
@@ -160,12 +162,7 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
             |CREATE TABLE t(i INT, k STRING) USING PARQUET
             |PARTITIONED BY (j INT)
             |""".stripMargin)
-        // When planned write is disabled, even though the write plan is 
already sorted,
-        // the AQE node inserted on top of the write query will remove the 
original
-        // sort orders. So the ordering will not match. This issue does not 
exist when
-        // planned write is enabled, because AQE will be applied on top of the 
write
-        // command instead of on top of the child query plan.
-        executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = 
enabled) {
+        executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) 
{
           sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j")
         }
       }
@@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
             |PARTITIONED BY (k STRING)
             |""".stripMargin)
         executeAndCheckOrdering(
-          hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = 
enabled) {
+          hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = 
enabled) {
           sql("INSERT INTO t SELECT * FROM t0 ORDER BY k")
         }
       }
     }
   }
 
+  test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string 
partition column") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      withPlannedWrite { enabled =>
+        withTable("t") {
+          sql(
+            """
+              |CREATE TABLE t(b INT, value STRING) USING PARQUET
+              |PARTITIONED BY (key INT)
+              |""".stripMargin)
+          executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = 
true) {
+            sql(
+              """
+                |INSERT INTO t
+                |SELECT b, value, key
+                |FROM testData JOIN testData2 ON key = a
+                |SORT BY key, value
+                |""".stripMargin)
+          }
+
+          // inspect the actually executed plan (that is different to 
executeAndCheckOrdering)
+          assert(FileFormatWriter.executedPlan.isDefined)
+          val executedPlan = FileFormatWriter.executedPlan.get
+
+          val plan = if (enabled) {
+            assert(executedPlan.isInstanceOf[WriteFilesExec])
+            executedPlan.asInstanceOf[WriteFilesExec].child
+          } else {
+            executedPlan.transformDown {
+              case a: AdaptiveSparkPlanExec => a.executedPlan
+            }
+          }
+
+          // assert the outer most sort in the executed plan
+          assert(plan.collectFirst {
+            case s: SortExec => s
+          }.exists {
+            case SortExec(Seq(
+            SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _),
+            SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
+            ), false, _, _) => true
+            case _ => false
+          }, plan)
+        }
+      }
+    }
+  }
+
+  test("SPARK-41914: v1 write with AQE and in-partition sorted - string 
partition column") {
+    withPlannedWrite { enabled =>
+      withTable("t") {
+        sql(
+          """
+            |CREATE TABLE t(key INT, b INT) USING PARQUET
+            |PARTITIONED BY (value STRING)
+            |""".stripMargin)
+        executeAndCheckOrdering(
+          hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = 
enabled) {
+          sql(
+            """
+              |INSERT INTO t
+              |SELECT key, b, value
+              |FROM testData JOIN testData2 ON key = a
+              |SORT BY value, key
+              |""".stripMargin)
+        }
+
+        // inspect the actually executed plan (that is different to 
executeAndCheckOrdering)
+        assert(FileFormatWriter.executedPlan.isDefined)
+        val executedPlan = FileFormatWriter.executedPlan.get
+
+        val plan = if (enabled) {
+          assert(executedPlan.isInstanceOf[WriteFilesExec])
+          executedPlan.asInstanceOf[WriteFilesExec].child
+        } else {
+          executedPlan.transformDown {
+            case a: AdaptiveSparkPlanExec => a.executedPlan
+          }
+        }
+
+        // assert the outer most sort in the executed plan
+        assert(plan.collectFirst {
+          case s: SortExec => s
+        }.map(s => (enabled, s)).exists {
+          case (false, SortExec(Seq(
+          SortOrder(AttributeReference("value", StringType, _, _), Ascending, 
NullsFirst, _),
+          SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _)
+          ), false, _, _)) => true
+
+          // SPARK-40885: this bug removes the in-partition sort, which 
manifests here
+          case (true, SortExec(Seq(
+          SortOrder(AttributeReference("value", StringType, _, _), Ascending, 
NullsFirst, _)
+          ), false, _, _)) => true
+          case _ => false
+        }, plan)
+      }
+    }
+  }
+
   test("v1 write with null and empty string column values") {
     withPlannedWrite { enabled =>
       withTempPath { path =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to