More insights on config regarding this issue:
With spark.sql.adaptive.enabled set true, this fails for all 3.x
versions, except for master (3.4.0-SNAPSHOT). When set false, it works
as expected for all versions.
With spark.sql.adaptive.enabled set true, and
spark.sql.adaptive.coalescePartitions.enabled set false, it still fails
for all versions before 3.4.0.
Enrico
Am 11.10.22 um 12:15 schrieb Enrico Minack:
Hi Devs,
this has been raised by Swetha on the user mailing list, which also
hit us recently.
Here is the question again:
*Is it guaranteed that written files are sorted as stated in
**sortWithinPartitions**?*
ds.repartition($"day")
.sortWithinPartitions($"day", $"id")
.write
.partitionBy("day")
.csv("interleaved.csv")
This construct is a common use case to generate partitioned and sorted
files, where downstream systems depend on guaranteed order.
Instead of
0
1
2
3
4
...
9999995
9999996
9999997
9999998
9999999
You get
0
8388608
1
8388609
2
8388610
3
8388611
4
...
1611390
9999998
1611391
9999999
1611392
1611393
1611394
...
8388600
8388601
8388602
8388603
8388604
8388605
8388606
8388607
It used to work until 3.0.3. *Was this guaranteed to work or just
happened to be correct?*
It stopped working with 3.1.0, but we can workaround setting
spark.sql.adaptive.coalescePartitions.enabled="false". *Is that
guaranteed to fix it?*
With 3.2.x and 3.3.x, the workaround does not work. *Is there a
workaround?*
It has been fixed in 3.4.0-SNAPSHOT. *Was that fixed intentionally or
accidentally?*
Code to reproduce:
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SaveMode
val ids = 10000000
val days = 10
case class Value(day: Long, id: Long)
val ds = spark.range(days).withColumnRenamed("id",
"day").join(spark.range(ids)).as[Value]
// days * 10 is required, as well as a sufficiently large value for
ids (10m) and day (10)
ds.repartition(days * 10, $"day")
.sortWithinPartitions($"day", $"id")
.write
.partitionBy("day")
.mode(SaveMode.Overwrite)
.csv("interleaved.csv")
val df =
spark.read.schema(Encoders.product[Value].schema).csv("interleaved.csv")
Check the written files are sorted (says OK when they are sorted):
for file in interleaved.csv/day\=*/part-*
do
echo "$(sort -n "$file" | md5sum | cut -d " " -f 1) $file"
done | md5sum -c
Thanks for your background knowledge on this.
Cheers,
Enrico