More insights on config regarding this issue:

With spark.sql.adaptive.enabled set true, this fails for all 3.x versions, except for master (3.4.0-SNAPSHOT). When set false, it works as expected for all versions.

With spark.sql.adaptive.enabled set true, and spark.sql.adaptive.coalescePartitions.enabled set false, it still fails for all versions before 3.4.0.


Enrico


Am 11.10.22 um 12:15 schrieb Enrico Minack:

Hi Devs,

this has been raised by Swetha on the user mailing list, which also hit us recently.

Here is the question again:

*Is it guaranteed that written files are sorted as stated in **sortWithinPartitions**?*

ds.repartition($"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .csv("interleaved.csv")

This construct is a common use case to generate partitioned and sorted files, where downstream systems depend on guaranteed order.

Instead of

0
1
2
3
4
...
9999995
9999996
9999997
9999998
9999999

You get

0
8388608
1
8388609
2
8388610
3
8388611
4
...
1611390
9999998
1611391
9999999
1611392
1611393
1611394
...
8388600
8388601
8388602
8388603
8388604
8388605
8388606
8388607

It used to work until 3.0.3. *Was this guaranteed to work or just happened to be correct?*

It stopped working with 3.1.0, but we can workaround setting spark.sql.adaptive.coalescePartitions.enabled="false". *Is that guaranteed to fix it?*

With 3.2.x and 3.3.x, the workaround does not work. *Is there a workaround?*

It has been fixed in 3.4.0-SNAPSHOT. *Was that fixed intentionally or accidentally?*


Code to reproduce:

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SaveMode

val ids = 10000000
val days = 10

case class Value(day: Long, id: Long)

val ds = spark.range(days).withColumnRenamed("id", "day").join(spark.range(ids)).as[Value]

// days * 10 is required, as well as a sufficiently large value for ids (10m) and day (10)
ds.repartition(days * 10, $"day")
  .sortWithinPartitions($"day", $"id")
  .write
  .partitionBy("day")
  .mode(SaveMode.Overwrite)
  .csv("interleaved.csv")

val df = spark.read.schema(Encoders.product[Value].schema).csv("interleaved.csv")

Check the written files are sorted (says OK when they are sorted):

for file in interleaved.csv/day\=*/part-*
do
  echo "$(sort -n "$file" | md5sum | cut -d " " -f 1)  $file"
done | md5sum -c


Thanks for your background knowledge on this.

Cheers,
Enrico

Reply via email to