EnricoMi commented on PR #38356: URL: https://github.com/apache/spark/pull/38356#issuecomment-1302534716
I had another deeper look into this issue. The `V1Writes` rule introduced in Spark 3.4 adds the `empty2null` to all nullable string partition columns: https://github.com/apache/spark/blob/7f3b5987de1f79434a861408e6c8bf55c5598031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala#L70-L94 The modified `V1WriteCommand` then has a modified `write.requiredOrdering` but the old `write.query.outputOrdering`, which then do not match any more. In `FileFormatWriter`, the outer sort will be added because of that mismatch: https://github.com/apache/spark/blob/7f3b5987de1f79434a861408e6c8bf55c5598031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L172-L194 The inner `Sort` will be optimized away because it is logically not needed. Result is: ``` *(1) Sort [p#39 ASC NULLS FIRST], false, 0 +- *(1) Project [id#30, sort_col#31, empty2null(p#32) AS p#39] +- ShuffleQueryStage 0 +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=56] +- LocalTableScan [id#30, sort_col#31, p#32] ``` Either `FileFormatWriter` gets the ability to see that whatever `empty2null` references is part of the ordering so it does not want to add the outer `Sort`. Or, `FileFormatWriter` and the `V1Writes` rule do not expect any ordering when no bucketing spec is given. Why would you want to sort the partition by the partition columns when there are no bucket spec and bucket sort? ```patch --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -140,10 +140,16 @@ object FileFormatWriter extends Logging { statsTrackers = statsTrackers ) - // We should first sort by dynamic partition columns, then bucket id, and finally sorting - // columns. - val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++ - writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns + // Only require order when bucket spec is given + val requiredOrdering = if (writerBucketSpec.isEmpty) { + Seq.empty + } else { + // We should first sort by dynamic partition columns, then bucket id, and finally sorting + // columns. + partitionColumns.drop(numStaticPartitionCols) ++ writerBucketSpec.map(_.bucketIdExpression) ++ + sortColumns + } + // the sort order doesn't matter // Use the output ordering from the original plan before adding the empty2null projection. val actualOrdering = plan.outputOrdering.map(_.child) --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -172,8 +172,10 @@ object V1WritesUtils { // Static partition must to be ahead of dynamic partition val dynamicPartitionColumns = partitionColumns.drop(numStaticPartitionCols) - if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty) { - // Do not insert logical sort when concurrent output writers are enabled. + if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && sortColumns.isEmpty + || writerBucketSpec.isEmpty) { + // Do not insert logical sort when concurrent output writers are enabled + // or no bucket spec exists. Seq.empty } else { // We should first sort by dynamic partition columns, then bucket id, and finally sorting ``` Then, the query is: ``` *(1) Project [id#37, sort_col#38, empty2null(p#39) AS p#46] +- *(1) Sort [p#39 ASC NULLS FIRST, sort_col#38 ASC NULLS FIRST], false, 0 +- ShuffleQueryStage 0 +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=56] +- LocalTableScan [id#37, sort_col#38, p#39] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org