This is a hard one. Spark duplicates the join child plan if it's a self-join because Spark does not support diamond-shaped query plans. Seems the only option here is to write the join child plan to a parquet table (or using a shuffle) and read it back.
On Mon, Aug 1, 2022 at 4:46 PM Enrico Minack <m...@enrico.minack.dev> wrote: > Hi all, > > this holds for any non-deterministic function, e.g. rand(), uuid(), > spark_partition_id(), ... Details below. > > To my knowledge, marking a function non-deterministic is to prevent it > from being called multiple times per row, which is violated here. > > read parquet read parquet > | | > NON-DETERMINIST-FUNC NON-DETERMINIST-FUNC > | | > \ / > \ / > join on non-deterministic id > | > ... > > What I would expect is > > read parquet > | > NON-DETERMINIST-FUNC > / \ > / \ > \ / > \ / > join on non-deterministic id > | > ... > > > Any thoughts? > > Enrico > > > > Am 27.06.22 um 11:21 schrieb Enrico Minack: > > Hi devs, > > SQL function spark_partition_id() provides the partition number for each > row. Using a Dataset that is enriched with that number in a self join may > produce an unexpected result. > > What I am doing: > > After calling spark_partition_id() I am counting rows per partition, then > join those counts back to the dataset on the partition id. > > *Adding some downstream operations, the join has some mismatching > partition ids:* > > import org.apache.spark.sql.SaveMode > > val cols = 10 > spark.range(1, 1000000, 1, > 100).write.mode(SaveMode.Overwrite).parquet("example-data.parquet") > val df1 = spark.read.parquet("example-data.parquet") > val df2 = df1.select($"id" +: 1.to(cols).map(column => > rand().as(column.toString)): _*) > val df3 = df2.orderBy($"id").withColumn("partition_id", > spark_partition_id()) > val df4 = df3.groupBy($"partition_id").count > val df5 = df3.join(df4, Seq("partition_id"), "full_outer") > val df6 = df5.groupBy($"partition_id").agg(sum($"count").as("rows in > partition"), 1.to(cols).map(column => sum(col(column.toString))).reduce(_ > + _).as("aggregates")) > df6.orderBy($"partition_id").show > > +------------+-----------------+------------------+ > |partition_id|rows in partition| aggregates| > +------------+-----------------+------------------+ > | 0| 11937639870|298035.81962027296| > | 1| 11986710088| 300833.1871222975| > | 2| 12060904752| 301299.8924913869| > | 3| 12049668900| 298595.2881827633| > | 4| 11837825400|298083.34705855395| > | 5| null| 301525.597300101| > | 6| null| 293885.554873316| > ... > +------------+-----------------+------------------+ > > Some partition ids do not have counts. > > The spark_partition_id() function is planned twice in the final Spark > plan, where each has different partitioning schemes, resulting in different > number of partition ids. > > Joining those together produces mismatches: > > read parquet read parquet > | | > exchange exchange > 17 partitions 12 partitions > | | > spark_partition_id spark_partition_id > | | > | count per partition id > \ / > \ / > join on partition id > | > ... > > I would have expected the Dataset with partition id to have been reused: > > read parquet > | > spark_partition_id > / \ > / \ > / count per partition id > / / > \ / > join on partition id > | > ... > > > I understand that spark_partition_id is non-deterministic, because it > depends on the existing partitioning. But being scheduled twice and > therefore being sensitive to query planning was surprising. > > *Questions:* > > Is this expected? Is this behavior fully covered by "@note This is > non-deterministic because it depends on data partitioning and task > scheduling."? > > Is there a way to "materialize" the partition ids (other than caching or > check-pointing) so that downstream operations do not see different values > for this column? > > Is there a way to tell Catalyst to stop optimizing across > spark_partition_id (optimize the plan before with the plan after), like the > AnalysisBarrier used to do? > > > Cheers, > Enrico > > > -- > Dr.-Ing. Enrico Minack > Freiberuflicher Software Ingenieur > > e-mail: m...@enrico.minack.dev > Teams: te...@enrico.minack.dev > Skype: sk...@enrico.minack.dev > > Mob: +49 179 253 55 33 · Fon: +49 5108 60 29 735 · Fax: +49 3212 144 75 13 > Südstraße 21 · 30989 Gehrden · Germany · USt-IdNr.: DE325828631 >