[ https://issues.apache.org/jira/browse/SPARK-45543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775566#comment-17775566 ]
Jiaan Geng commented on SPARK-45543: ------------------------------------ [~ronserruya]I see. > WindowGroupLimit Causes incorrect results when multiple windows are used > ------------------------------------------------------------------------ > > Key: SPARK-45543 > URL: https://issues.apache.org/jira/browse/SPARK-45543 > Project: Spark > Issue Type: Bug > Components: Optimizer, Spark Core, SQL > Affects Versions: 3.5.0 > Reporter: Ron Serruya > Priority: Critical > Labels: correctness, data-loss > > First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not > very knowledgeable about spark internals, I hope I diagnosed the problem > correctly > I found the degradation in spark version 3.5.0: > When using multiple windows that share the same partition and ordering (but > with different "frame boundaries", where one window is a ranking function, > "WindowGroupLimit" is added to the plan causing wrong values to be created > from the other windows. > *This behavior didn't exist in versions 3.3 and 3.4.* > Example: > > {code:python} > import pysparkfrom pyspark.sql import functions as F, Window > df = spark.createDataFrame([ > {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020}, > {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022}, > {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023}, > {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021}, > ]) > # Create first window for row number > window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year')) > # Create additional window from the first window with unbounded frame > unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, > Window.unboundedFollowing) > # Try to keep the first row by year, and also collect all scores into a list > df2 = df.withColumn( > 'rn', > F.row_number().over(window_spec) > ).withColumn( > 'all_scores', > F.collect_list('score').over(unbound_spec) > ){code} > So far everything works, and if we display df2: > > {noformat} > +----+------+-----+----+---+----------+ > |name|row_id|score|year|rn |all_scores| > +----+------+-----+----+---+----------+ > |Dave|1 |3 |2023|1 |[3, 2, 1] | > |Dave|1 |2 |2022|2 |[3, 2, 1] | > |Dave|1 |1 |2020|3 |[3, 2, 1] | > |Amy |2 |6 |2021|1 |[6] | > +----+------+-----+----+---+----------+{noformat} > > However, once we filter to keep only the first row number: > > {noformat} > df2.filter("rn=1").show(truncate=False) > +----+------+-----+----+---+----------+ > |name|row_id|score|year|rn |all_scores| > +----+------+-----+----+---+----------+ > |Dave|1 |3 |2023|1 |[3] | > |Amy |2 |6 |2021|1 |[6] | > +----+------+-----+----+---+----------+{noformat} > As you can see just filtering changed the "all_scores" array for Dave. > (This example uses `collect_list`, however, the same result happens with > other functions, such as max, min, count, etc) > > Now, if instead of using the two windows we used, I will use the first window > and a window with different ordering, or create a completely new window with > same partition but no ordering, it will work fine: > {code:python} > new_window = Window.partitionBy('row_id', > 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) > df3 = df.withColumn( > 'rn', > F.row_number().over(window_spec) > ).withColumn( > 'all_scores', > F.collect_list('score').over(new_window) > ) > df3.filter("rn=1").show(truncate=False){code} > {noformat} > +----+------+-----+----+---+----------+ > |name|row_id|score|year|rn |all_scores| > +----+------+-----+----+---+----------+ > |Dave|1 |3 |2023|1 |[3, 2, 1] | > |Amy |2 |6 |2021|1 |[6] | > +----+------+-----+----+---+----------+ > {noformat} > In addition, if we use all 3 windows to create 3 different columns, it will > also work ok. So it seems the issue happens only when all the windows used > share the same partition and ordering. > Here is the final plan for the faulty dataframe: > {noformat} > df2.filter("rn=1").explain() > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Filter (rn#9 = 1) > +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L > DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), > currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) > windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) > AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST] > +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], > row_number(), 1, Final > +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L > DESC NULLS LAST], false, 0 > +- Exchange hashpartitioning(row_id#1L, name#0, 200), > ENSURE_REQUIREMENTS, [plan_id=425] > +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS > LAST], row_number(), 1, Partial > +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, > year#3L DESC NULLS LAST], false, 0 > +- Scan > ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat} > I suspect the issue is caused due to the "WindowGroupLimit" clause in the > plan. > This clause doesn't appear in the dataframes that work fine, and before > filtering the rn. > So I assume that since the optimizer detects that we want to only keep the > first row of the ranking function, it first removes all other rows from the > following computations, which causes the incorrect result or loss of data. > I think the bug comes from this change (and the attached PRs): > https://issues.apache.org/jira/browse/SPARK-44340 > It was added in spark 3.5.0, and in addition, I noticed that it was included > in databricks release 13.3, which included spark 3.4.0, but also this fix in > their release note. And evidently, this bug happens on databricks13 spark3.4, > but not on my local spark 3.4 > tagging user [~beliefer] as I believe you would know most about this. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org