EnricoMi commented on PR #38356:
URL: https://github.com/apache/spark/pull/38356#issuecomment-1302534716

   I had another deeper look into this issue. The `V1Writes` rule introduced in 
Spark 3.4 adds the `empty2null` to all nullable string partition columns:
   
https://github.com/apache/spark/blob/7f3b5987de1f79434a861408e6c8bf55c5598031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala#L70-L94
   
   The modified `V1WriteCommand` then has a modified `write.requiredOrdering` 
but the old `write.query.outputOrdering`, which then do not match any more. In 
`FileFormatWriter`, the outer sort will be added because of that mismatch:
   
https://github.com/apache/spark/blob/7f3b5987de1f79434a861408e6c8bf55c5598031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L172-L194
   
   The inner `Sort` will be optimized away because it is logically not needed. 
Result is:
   
   ```
   *(1) Sort [p#39 ASC NULLS FIRST], false, 0
   +- *(1) Project [id#30, sort_col#31, empty2null(p#32) AS p#39]
      +- ShuffleQueryStage 0
         +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, 
[plan_id=56]
            +- LocalTableScan [id#30, sort_col#31, p#32]
   ```
   
   Either `FileFormatWriter` gets the ability to see that whatever `empty2null` 
references is part of the ordering so it does not want to add the outer `Sort`.
   
   Or, `FileFormatWriter` and the `V1Writes` rule do not expect any ordering 
when no bucketing spec is given.
   Why would you want to sort the partition by the partition columns when there 
are no bucket spec and bucket sort?
   
   ```patch
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
   @@ -140,10 +140,16 @@ object FileFormatWriter extends Logging {
          statsTrackers = statsTrackers
        )
    
   -    // We should first sort by dynamic partition columns, then bucket id, 
and finally sorting
   -    // columns.
   -    val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
   -        writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
   +    // Only require order when bucket spec is given
   +    val requiredOrdering = if (writerBucketSpec.isEmpty) {
   +      Seq.empty
   +    } else {
   +      // We should first sort by dynamic partition columns, then bucket id, 
and finally sorting
   +      // columns.
   +      partitionColumns.drop(numStaticPartitionCols) ++ 
writerBucketSpec.map(_.bucketIdExpression) ++
   +        sortColumns
   +    }
   +
        // the sort order doesn't matter
        // Use the output ordering from the original plan before adding the 
empty2null projection.
        val actualOrdering = plan.outputOrdering.map(_.child)
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
   @@ -172,8 +172,10 @@ object V1WritesUtils {
        // Static partition must to be ahead of dynamic partition
        val dynamicPartitionColumns = 
partitionColumns.drop(numStaticPartitionCols)
    
   -    if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && 
sortColumns.isEmpty) {
   -      // Do not insert logical sort when concurrent output writers are 
enabled.
   +    if (SQLConf.get.maxConcurrentOutputFileWriters > 0 && 
sortColumns.isEmpty
   +      || writerBucketSpec.isEmpty) {
   +      // Do not insert logical sort when concurrent output writers are 
enabled
   +      // or no bucket spec exists.
          Seq.empty
        } else {
          // We should first sort by dynamic partition columns, then bucket id, 
and finally sorting
   ```
   
   Then, the query is:
   ```
   *(1) Project [id#37, sort_col#38, empty2null(p#39) AS p#46]
   +- *(1) Sort [p#39 ASC NULLS FIRST, sort_col#38 ASC NULLS FIRST], false, 0
      +- ShuffleQueryStage 0
         +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, 
[plan_id=56]
            +- LocalTableScan [id#37, sort_col#38, p#39]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to