[ https://issues.apache.org/jira/browse/SPARK-41914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-41914: ----------------------------------- Assignee: Enrico Minack > Sorting issue with partitioned-writing and planned write optimization disabled > ------------------------------------------------------------------------------ > > Key: SPARK-41914 > URL: https://issues.apache.org/jira/browse/SPARK-41914 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.0 > Reporter: Enrico Minack > Assignee: Enrico Minack > Priority: Major > > Spark 3.4.0 introduced option > {{{}spark.sql.optimizer.plannedWrite.enabled{}}}, which is enabled by > default. When disabled, partitioned writing loses in-partition order when > spilling occurs. > This is related to SPARK-40885 where setting option > {{spark.sql.optimizer.plannedWrite.enabled}} to {{true}} will remove the > existing sort (for {{day}} and {{{}id{}}}) entirely. > Run this with 512m memory and one executor, e.g.: > {code} > spark-shell --driver-memory 512m --master "local[1]" > {code} > {code:scala} > import org.apache.spark.sql.SaveMode > spark.conf.set("spark.sql.optimizer.plannedWrite.enabled", false) > val ids = 2000000 > val days = 2 > val parts = 2 > val ds = spark.range(0, days, 1, parts).withColumnRenamed("id", > "day").join(spark.range(0, ids, 1, parts)) > ds.repartition($"day") > .sortWithinPartitions($"day", $"id") > .write > .partitionBy("day") > .mode(SaveMode.Overwrite) > .csv("interleaved.csv") > {code} > Check the written files are sorted (states OK when file is sorted): > {code:bash} > for file in interleaved.csv/day\=*/part-* > do > echo "$(sort -n "$file" | md5sum | cut -d " " -f 1) $file" > done | md5sum -c > {code} > Files should look like this > {code} > 0 > 1 > 2 > ... > 1048576 > 1048577 > 1048578 > ... > {code} > But they look like > {code} > 0 > 1048576 > 1 > 1048577 > 2 > 1048578 > ... > {code} > The cause issue is the same as in SPARK-40588. A sort (for {{{}day{}}}) is > added on top of the existing sort (for {{day}} and {{{}id{}}}). Spilling > interleaves the sorted spill files. > {code} > Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 > +- AdaptiveSparkPlan isFinalPlan=false > +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, > [plan_id=30] > +- BroadcastNestedLoopJoin BuildLeft, Inner > :- BroadcastExchange IdentityBroadcastMode, [plan_id=28] > : +- Project [id#0L AS day#2L] > : +- Range (0, 2, step=1, splits=2) > +- Range (0, 2000000, step=1, splits=2) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org