[ https://issues.apache.org/jira/browse/SPARK-38282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17495899#comment-17495899 ]
Tanel Kiis commented on SPARK-38282: ------------------------------------ [~cloud_fan], any ideas how to improve this? I could submit a PR, but I'm not sure, what would be the best way here. > Avoid duplicating complex partitioning expressions > -------------------------------------------------- > > Key: SPARK-38282 > URL: https://issues.apache.org/jira/browse/SPARK-38282 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.3.0 > Reporter: Tanel Kiis > Priority: Major > > Spark will duplicate all non-trivial expressions in Window.partitionBy, that > will result in duplicate exchanges and WindowExec nodes. > An example unit test: > {code} > test("SPARK-38282: Avoid duplicating complex partitioning expressions") { > val group = functions.col("id") % 2 > val min = functions.min("id").over(Window.partitionBy(group)) > val max = functions.max("id").over(Window.partitionBy(group)) > val df1 = spark.range(1, 4) > .withColumn("ratio", max / min) > val df2 = spark.range(1, 4) > .withColumn("min", min) > .withColumn("max", max) > .select(col("id"), (col("max") / col("min")).as("ratio")) > Seq(df1, df2).foreach { df => > checkAnswer( > df, > Seq(Row(1L, 3.0), Row(2L, 1.0), Row(3L, 3.0))) > val windows = collect(df.queryExecution.executedPlan) { > case w: WindowExec => w > } > assert(windows.size == 1) > } > } > {code} > The query plan for this (_w0#5L and _w1#6L are duplicates): > {code} > Window [min(id#2L) windowspecdefinition(_w1#6L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS _we1#8L], [_w1#6L] > +- *(4) Sort [_w1#6L ASC NULLS FIRST], false, 0 > +- AQEShuffleRead coalesced > +- ShuffleQueryStage 1 > +- Exchange hashpartitioning(_w1#6L, 5), ENSURE_REQUIREMENTS, > [id=#256] > +- *(3) Project [id#2L, _w1#6L, _we0#7L] > +- Window [max(id#2L) windowspecdefinition(_w0#5L, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS _we0#7L], [_w0#5L] > +- *(2) Sort [_w0#5L ASC NULLS FIRST], false, 0 > +- AQEShuffleRead coalesced > +- ShuffleQueryStage 0 > +- Exchange hashpartitioning(_w0#5L, 5), > ENSURE_REQUIREMENTS, [id=#203] > +- *(1) Project [id#2L, (id#2L % 2) AS > _w0#5L, (id#2L % 2) AS _w1#6L] > +- *(1) Range (1, 4, step=1, splits=2) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org