[ 
https://issues.apache.org/jira/browse/SPARK-41914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-41914:
-----------------------------------

    Assignee: Enrico Minack

> Sorting issue with partitioned-writing and planned write optimization disabled
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-41914
>                 URL: https://issues.apache.org/jira/browse/SPARK-41914
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Enrico Minack
>            Assignee: Enrico Minack
>            Priority: Major
>
> Spark 3.4.0 introduced option 
> {{{}spark.sql.optimizer.plannedWrite.enabled{}}}, which is enabled by 
> default. When disabled, partitioned writing loses in-partition order when 
> spilling occurs.
> This is related to SPARK-40885 where setting option 
> {{spark.sql.optimizer.plannedWrite.enabled}} to {{true}} will remove the 
> existing sort (for {{day}} and {{{}id{}}}) entirely.
> Run this with 512m memory and one executor, e.g.:
> {code}
> spark-shell --driver-memory 512m --master "local[1]"
> {code}
> {code:scala}
> import org.apache.spark.sql.SaveMode
> spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false)
> val ids = 2000000
> val days = 2
> val parts = 2
> val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", 
> "day").join(spark.range(0, ids, 1, parts))
> ds.repartition($"day")
>   .sortWithinPartitions($"day", $"id")
>   .write
>   .partitionBy("day")
>   .mode(SaveMode.Overwrite)
>   .csv("interleaved.csv")
> {code}
> Check the written files are sorted (states OK when file is sorted):
> {code:bash}
> for file in interleaved.csv/day\=*/part-*
> do
>   echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
> done | md5sum -c
> {code}
> Files should look like this
> {code}
> 0
> 1
> 2
> ...
> 1048576
> 1048577
> 1048578
> ...
> {code}
> But they look like
> {code}
> 0
> 1048576
> 1
> 1048577
> 2
> 1048578
> ...
> {code}
> The cause issue is the same as in SPARK-40588. A sort (for {{{}day{}}}) is 
> added on top of the existing sort (for {{day}} and {{{}id{}}}). Spilling 
> interleaves the sorted spill files.
> {code}
> Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
> +- AdaptiveSparkPlan isFinalPlan=false
>    +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, 
> [plan_id=30]
>          +- BroadcastNestedLoopJoin BuildLeft, Inner
>             :- BroadcastExchange IdentityBroadcastMode, [plan_id=28]
>             :  +- Project [id#0L AS day#2L]
>             :     +- Range (0, 2, step=1, splits=2)
>             +- Range (0, 2000000, step=1, splits=2)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to