This is a hard one. Spark duplicates the join child plan if it's a
self-join because Spark does not support diamond-shaped query plans. Seems
the only option here is to write the join child plan to a parquet table (or
using a shuffle) and read it back.

On Mon, Aug 1, 2022 at 4:46 PM Enrico Minack <m...@enrico.minack.dev> wrote:

> Hi all,
>
> this holds for any non-deterministic function, e.g. rand(), uuid(),
> spark_partition_id(), ... Details below.
>
> To my knowledge, marking a function non-deterministic is to prevent it
> from being called multiple times per row, which is violated here.
>
>    read parquet          read parquet
>         |                     |
> NON-DETERMINIST-FUNC  NON-DETERMINIST-FUNC
>         |                     |
>          \                   /
>           \                 /
>      join on non-deterministic id
>                    |
>                   ...
>
> What I would expect is
>
>             read parquet
>                  |
>          NON-DETERMINIST-FUNC
>              /       \
>             /         \
>             \         /
>              \       /
>    join on non-deterministic id
>                  |
>                 ...
>
>
> Any thoughts?
>
> Enrico
>
>
>
> Am 27.06.22 um 11:21 schrieb Enrico Minack:
>
> Hi devs,
>
> SQL function spark_partition_id() provides the partition number for each
> row. Using a Dataset that is enriched with that number in a self join may
> produce an unexpected result.
>
> What I am doing:
>
> After calling spark_partition_id() I am counting rows per partition, then
> join those counts back to the dataset on the partition id.
>
> *Adding some downstream operations, the join has some mismatching
> partition ids:*
>
> import org.apache.spark.sql.SaveMode
>
> val cols = 10
> spark.range(1, 1000000, 1,
> 100).write.mode(SaveMode.Overwrite).parquet("example-data.parquet")
> val df1 = spark.read.parquet("example-data.parquet")
> val df2 = df1.select($"id" +: 1.to(cols).map(column =>
> rand().as(column.toString)): _*)
> val df3 = df2.orderBy($"id").withColumn("partition_id",
> spark_partition_id())
> val df4 = df3.groupBy($"partition_id").count
> val df5 = df3.join(df4, Seq("partition_id"), "full_outer")
> val df6 = df5.groupBy($"partition_id").agg(sum($"count").as("rows in
> partition"), 1.to(cols).map(column => sum(col(column.toString))).reduce(_
> + _).as("aggregates"))
> df6.orderBy($"partition_id").show
>
> +------------+-----------------+------------------+
> |partition_id|rows in partition|        aggregates|
> +------------+-----------------+------------------+
> |           0|      11937639870|298035.81962027296|
> |           1|      11986710088| 300833.1871222975|
> |           2|      12060904752| 301299.8924913869|
> |           3|      12049668900| 298595.2881827633|
> |           4|      11837825400|298083.34705855395|
> |           5|             null|  301525.597300101|
> |           6|             null|  293885.554873316|
> ...
> +------------+-----------------+------------------+
>
> Some partition ids do not have counts.
>
> The spark_partition_id() function is planned twice in the final Spark
> plan, where each has different partitioning schemes, resulting in different
> number of partition ids.
>
> Joining those together produces mismatches:
>
>    read parquet        read parquet
>         |                   |
>      exchange            exchange
>   17 partitions       12 partitions
>         |                   |
> spark_partition_id  spark_partition_id
>         |                    |
>         |          count per partition id
>          \                 /
>           \               /
>         join on partition id
>                   |
>                  ...
>
> I would have expected the Dataset with partition id to have been reused:
>
>             read parquet
>                  |
>          spark_partition_id
>              /       \
>             /         \
>            /    count per partition id
>           /           /
>           \          /
>         join on partition id
>                   |
>                  ...
>
>
> I understand that spark_partition_id is non-deterministic, because it
> depends on the existing partitioning. But being scheduled twice and
> therefore being sensitive to query planning was surprising.
>
> *Questions:*
>
> Is this expected? Is this behavior fully covered by "@note This is
> non-deterministic because it depends on data partitioning and task
> scheduling."?
>
> Is there a way to "materialize" the partition ids (other than caching or
> check-pointing) so that downstream operations do not see different values
> for this column?
>
> Is there a way to tell Catalyst to stop optimizing across
> spark_partition_id (optimize the plan before with the plan after), like the
> AnalysisBarrier used to do?
>
>
> Cheers,
> Enrico
>
>
> --
> Dr.-Ing. Enrico Minack
> Freiberuflicher Software Ingenieur
>
> e-mail: m...@enrico.minack.dev
> Teams: te...@enrico.minack.dev
> Skype: sk...@enrico.minack.dev
>
> Mob: +49 179 253 55 33 · Fon: +49 5108 60 29 735 · Fax: +49 3212 144 75 13
> Südstraße 21 · 30989 Gehrden · Germany · USt-IdNr.: DE325828631
>

Reply via email to