This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af1615d026e [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result 
partitions are sorted according to partition values
af1615d026e is described below

commit af1615d026eaf4aeec27ccfe3c58011ebbcb3de1
Author: Chao Sun <sunc...@apple.com>
AuthorDate: Thu Sep 7 18:19:54 2023 +0800

    [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted 
according to partition values
    
    ### What changes were proposed in this pull request?
    
    This PR makes sure the result grouped partitions from 
`DataSourceV2ScanExec#groupPartitions` are sorted according to the partition 
values. Previously in the #42757 we were assuming Scala would preserve the 
input ordering but apparently that's not the case.
    
    ### Why are the changes needed?
    
    See https://github.com/apache/spark/pull/42757#discussion_r1316926504 for 
diagnosis. The partition ordering is a fundamental property for SPJ and thus 
must be guaranteed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    We have tests in `KeyGroupedPartitioningSuite` to cover this.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #42839 from sunchao/SPARK-45036-followup.
    
    Authored-by: Chao Sun <sunc...@apple.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../execution/datasources/v2/DataSourceV2ScanExecBase.scala   | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index 94667fbd00c..b2f94cae2df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
         // also sort the input partitions according to their partition key 
order. This ensures
         // a canonical order from both sides of a bucketed join, for example.
         val partitionDataTypes = expressions.map(_.dataType)
-        val partitionOrdering: Ordering[(InternalRow, InputPartition)] = {
-          
RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1)
-        }
-        val sortedKeyToPartitions = results.sorted(partitionOrdering)
-        val groupedPartitions = sortedKeyToPartitions
+        val rowOrdering = 
RowOrdering.createNaturalAscendingOrdering(partitionDataTypes)
+        val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: 
(InternalRow, _)) => t._1))
+        val sortedGroupedPartitions = sortedKeyToPartitions
             .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2))
             .groupBy(_._1)
             .toSeq
             .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) }
+            .sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value))
 
-        Some(KeyGroupedPartitionInfo(groupedPartitions, 
sortedKeyToPartitions.map(_._2)))
+        Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, 
sortedKeyToPartitions.map(_._2)))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to