This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new af1615d026e [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted according to partition values af1615d026e is described below commit af1615d026eaf4aeec27ccfe3c58011ebbcb3de1 Author: Chao Sun <sunc...@apple.com> AuthorDate: Thu Sep 7 18:19:54 2023 +0800 [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted according to partition values ### What changes were proposed in this pull request? This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the #42757 we were assuming Scala would preserve the input ordering but apparently that's not the case. ### Why are the changes needed? See https://github.com/apache/spark/pull/42757#discussion_r1316926504 for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? We have tests in `KeyGroupedPartitioningSuite` to cover this. ### Was this patch authored or co-authored using generative AI tooling? Closes #42839 from sunchao/SPARK-45036-followup. Authored-by: Chao Sun <sunc...@apple.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../execution/datasources/v2/DataSourceV2ScanExecBase.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 94667fbd00c..b2f94cae2df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { // also sort the input partitions according to their partition key order. This ensures // a canonical order from both sides of a bucketed join, for example. val partitionDataTypes = expressions.map(_.dataType) - val partitionOrdering: Ordering[(InternalRow, InputPartition)] = { - RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1) - } - val sortedKeyToPartitions = results.sorted(partitionOrdering) - val groupedPartitions = sortedKeyToPartitions + val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) + val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1)) + val sortedGroupedPartitions = sortedKeyToPartitions .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2)) .groupBy(_._1) .toSeq .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) } + .sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value)) - Some(KeyGroupedPartitionInfo(groupedPartitions, sortedKeyToPartitions.map(_._2))) + Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, sortedKeyToPartitions.map(_._2))) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org