This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c0911981887 [SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering c0911981887 is described below commit c091198188789afb1282bc76419cf6e1397b0bc9 Author: Enrico Minack <git...@enrico.minack.dev> AuthorDate: Tue Jan 10 13:10:07 2023 +0800 [SPARK-41914][SQL] FileFormatWriter materializes AQE plan before accessing outputOrdering ### What changes were proposed in this pull request? The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required when planned writing is disabled (`spark.sql.optimizer.plannedWrite.enabled` is `true` by default). With planned writing enabled `FileFormatWriter` gets the final plan already. ### Why are the changes needed? `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering, in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588). ### Does this PR introduce _any_ user-facing change? This fixes SPARK-40588 for 3.4, which was introduced in 3.0. This restores behaviour from Spark 2.4. ### How was this patch tested? The final plan that is written to files is now stored in `FileFormatWriter.executedPlan` (similar to existing `FileFormatWriter.outputOrderingMatched`). Unit tests assert the outermost sort order written to files. The actual plan written into the files changed from (taken from `"SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column"`): ``` Sort [input[2, int, false] ASC NULLS FIRST], false, 0 +- *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0 +- *(3) Project [b#24, value#14, key#13] +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false :- BroadcastQueryStage 2 : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=376] : +- AQEShuffleRead local : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [plan_id=328] : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14] : +- Scan[obj#12] +- AQEShuffleRead local +- ShuffleQueryStage 1 +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [plan_id=345] +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] ``` where `FileFormatWriter` enforces order with `Sort [input[2, int, false] ASC NULLS FIRST], false, 0`, to ``` *(3) Sort [key#13 ASC NULLS FIRST, value#14 ASC NULLS FIRST], false, 0 +- *(3) Project [b#24, value#14, key#13] +- *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false :- BroadcastQueryStage 2 : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=375] : +- AQEShuffleRead local : +- ShuffleQueryStage 0 : +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [plan_id=327] : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14] : +- Scan[obj#12] +- AQEShuffleRead local +- ShuffleQueryStage 1 +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [plan_id=344] +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24] +- Scan[obj#22] ``` where the sort given by the user is the outermost sort now. Closes #39431 from EnricoMi/branch-materialize-aqe-plan. Authored-by: Enrico Minack <git...@enrico.minack.dev> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 + .../execution/datasources/FileFormatWriter.scala | 36 ++++++- .../datasources/V1WriteCommandSuite.scala | 115 +++++++++++++++++++-- 3 files changed, 138 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 62a75e75345..395e5468b64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -225,6 +225,8 @@ case class AdaptiveSparkPlanExec( .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) } + def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 8ce87d6fbe1..6285095c647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -62,6 +63,11 @@ object FileFormatWriter extends Logging { */ private[sql] var outputOrderingMatched: Boolean = false + /** + * A variable used in tests to check the final executed plan. + */ + private[sql] var executedPlan: Option[SparkPlan] = None + // scalastyle:off argcount /** * Basic work flow of this command is: @@ -138,9 +144,21 @@ object FileFormatWriter extends Logging { val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan) + + // SPARK-40588: when planned writing is disabled and AQE is enabled, + // plan contains an AdaptiveSparkPlanExec, which does not know + // its final plan's ordering, so we have to materialize that plan first + // it is fine to use plan further down as the final plan is cached in that plan + def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match { + case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan + case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan)) + } + // the sort order doesn't matter // Use the output ordering from the original plan before adding the empty2null projection. - val actualOrdering = writeFilesOpt.map(_.child).getOrElse(plan).outputOrdering.map(_.child) + val actualOrdering = writeFilesOpt.map(_.child) + .getOrElse(materializeAdaptiveSparkPlan(plan)) + .outputOrdering.map(_.child) val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) @@ -198,19 +216,24 @@ object FileFormatWriter extends Logging { } writeAndCommit(job, description, committer) { - val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) { - (empty2NullPlan.execute(), None) + val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) { + (empty2NullPlan, None) } else { val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, outputSpec) val concurrentOutputWriterSpec = createConcurrentOutputWriterSpec( sparkSession, sortPlan, sortColumns) if (concurrentOutputWriterSpec.isDefined) { - (empty2NullPlan.execute(), concurrentOutputWriterSpec) + (empty2NullPlan, concurrentOutputWriterSpec) } else { - (sortPlan.execute(), concurrentOutputWriterSpec) + (sortPlan, concurrentOutputWriterSpec) } } + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planToExecute) + + val rdd = planToExecute.execute() + // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single // partition rdd to make sure we at least set up one write task to write the metadata. val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) { @@ -281,6 +304,9 @@ object FileFormatWriter extends Logging { val committer = writeFilesSpec.committer val description = writeFilesSpec.description + // In testing, this is the only way to get hold of the actually executed plan written to file + if (Utils.isTesting) executedPlan = Some(planForWrites) + writeAndCommit(job, description, committer) { val rdd = planForWrites.executeWrite(writeFilesSpec) val ret = new Array[WriteTaskResult](rdd.partitions.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index e9c5c77e6d9..80d0369044c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, SortExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.util.QueryExecutionListener trait V1WriteCommandSuiteBase extends SQLTestUtils { @@ -52,8 +55,7 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { } /** - * Execute a write query and check ordering of the plan. Return the optimized logical write - * query plan. + * Execute a write query and check ordering of the plan. */ protected def executeAndCheckOrdering( hasLogicalSort: Boolean, @@ -160,12 +162,7 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |CREATE TABLE t(i INT, k STRING) USING PARQUET |PARTITIONED BY (j INT) |""".stripMargin) - // When planned write is disabled, even though the write plan is already sorted, - // the AQE node inserted on top of the write query will remove the original - // sort orders. So the ordering will not match. This issue does not exist when - // planned write is enabled, because AQE will be applied on top of the write - // command instead of on top of the child query plan. - executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = enabled) { + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { sql("INSERT INTO t SELECT i, k, j FROM t0 ORDER BY j") } } @@ -181,13 +178,111 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write |PARTITIONED BY (k STRING) |""".stripMargin) executeAndCheckOrdering( - hasLogicalSort = true, orderingMatched = enabled, hasEmpty2Null = enabled) { + hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { sql("INSERT INTO t SELECT * FROM t0 ORDER BY k") } } } } + test("SPARK-41914: v1 write with AQE and in-partition sorted - non-string partition column") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(b INT, value STRING) USING PARQUET + |PARTITIONED BY (key INT) + |""".stripMargin) + executeAndCheckOrdering(hasLogicalSort = true, orderingMatched = true) { + sql( + """ + |INSERT INTO t + |SELECT b, value, key + |FROM testData JOIN testData2 ON key = a + |SORT BY key, value + |""".stripMargin) + } + + // inspect the actually executed plan (that is different to executeAndCheckOrdering) + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get + + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan.transformDown { + case a: AdaptiveSparkPlanExec => a.executedPlan + } + } + + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true + case _ => false + }, plan) + } + } + } + } + + test("SPARK-41914: v1 write with AQE and in-partition sorted - string partition column") { + withPlannedWrite { enabled => + withTable("t") { + sql( + """ + |CREATE TABLE t(key INT, b INT) USING PARQUET + |PARTITIONED BY (value STRING) + |""".stripMargin) + executeAndCheckOrdering( + hasLogicalSort = true, orderingMatched = true, hasEmpty2Null = enabled) { + sql( + """ + |INSERT INTO t + |SELECT key, b, value + |FROM testData JOIN testData2 ON key = a + |SORT BY value, key + |""".stripMargin) + } + + // inspect the actually executed plan (that is different to executeAndCheckOrdering) + assert(FileFormatWriter.executedPlan.isDefined) + val executedPlan = FileFormatWriter.executedPlan.get + + val plan = if (enabled) { + assert(executedPlan.isInstanceOf[WriteFilesExec]) + executedPlan.asInstanceOf[WriteFilesExec].child + } else { + executedPlan.transformDown { + case a: AdaptiveSparkPlanExec => a.executedPlan + } + } + + // assert the outer most sort in the executed plan + assert(plan.collectFirst { + case s: SortExec => s + }.map(s => (enabled, s)).exists { + case (false, SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) + ), false, _, _)) => true + + // SPARK-40885: this bug removes the in-partition sort, which manifests here + case (true, SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + ), false, _, _)) => true + case _ => false + }, plan) + } + } + } + test("v1 write with null and empty string column values") { withPlannedWrite { enabled => withTempPath { path => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org