[ 
https://issues.apache.org/jira/browse/SPARK-45543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775566#comment-17775566
 ] 

Jiaan Geng commented on SPARK-45543:
------------------------------------

[~ronserruya]I see.

> WindowGroupLimit Causes incorrect results when multiple windows are used
> ------------------------------------------------------------------------
>
>                 Key: SPARK-45543
>                 URL: https://issues.apache.org/jira/browse/SPARK-45543
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, Spark Core, SQL
>    Affects Versions: 3.5.0
>            Reporter: Ron Serruya
>            Priority: Critical
>              Labels: correctness, data-loss
>
> First, it's my first bug, so I'm hoping I'm doing it right, also, as I'm not 
> very knowledgeable about spark internals, I hope I diagnosed the problem 
> correctly
> I found the degradation in spark version 3.5.0:
> When using multiple windows that share the same partition and ordering (but 
> with different "frame boundaries", where one window is a ranking function, 
> "WindowGroupLimit" is added to the plan causing wrong values to be created 
> from the other windows.
> *This behavior didn't exist in versions 3.3 and 3.4.*
> Example:
>  
> {code:python}
> import pysparkfrom pyspark.sql import functions as F, Window  
> df = spark.createDataFrame([
>     {'row_id': 1, 'name': 'Dave', 'score': 1, 'year': 2020},
>     {'row_id': 1, 'name': 'Dave', 'score': 2, 'year': 2022},
>     {'row_id': 1, 'name': 'Dave', 'score': 3, 'year': 2023},
>     {'row_id': 2, 'name': 'Amy', 'score': 6, 'year': 2021},
> ])
> # Create first window for row number
> window_spec = Window.partitionBy('row_id', 'name').orderBy(F.desc('year'))
> # Create additional window from the first window with unbounded frame
> unbound_spec = window_spec.rowsBetween(Window.unboundedPreceding, 
> Window.unboundedFollowing)
> # Try to keep the first row by year, and also collect all scores into a list
> df2 = df.withColumn(
>     'rn', 
>     F.row_number().over(window_spec)
> ).withColumn(
>     'all_scores', 
>     F.collect_list('score').over(unbound_spec)
> ){code}
> So far everything works, and if we display df2:
>  
> {noformat}
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1     |3    |2023|1  |[3, 2, 1] |
> |Dave|1     |2    |2022|2  |[3, 2, 1] |
> |Dave|1     |1    |2020|3  |[3, 2, 1] |
> |Amy |2     |6    |2021|1  |[6]       |
> +----+------+-----+----+---+----------+{noformat}
>  
> However, once we filter to keep only the first row number:
>  
> {noformat}
> df2.filter("rn=1").show(truncate=False)
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1     |3    |2023|1  |[3]       |
> |Amy |2     |6    |2021|1  |[6]       |
> +----+------+-----+----+---+----------+{noformat}
> As you can see just filtering changed the "all_scores" array for Dave.
> (This example uses `collect_list`, however, the same result happens with 
> other functions, such as max, min, count, etc)
>  
> Now, if instead of using the two windows we used, I will use the first window 
> and a window with different ordering, or create a completely new window with 
> same partition but no ordering, it will work fine:
> {code:python}
> new_window = Window.partitionBy('row_id', 
> 'name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
> df3 = df.withColumn(
>     'rn',
>     F.row_number().over(window_spec)
> ).withColumn(
>     'all_scores',
>     F.collect_list('score').over(new_window)
> )
> df3.filter("rn=1").show(truncate=False){code}
> {noformat}
> +----+------+-----+----+---+----------+
> |name|row_id|score|year|rn |all_scores|
> +----+------+-----+----+---+----------+
> |Dave|1     |3    |2023|1  |[3, 2, 1] |
> |Amy |2     |6    |2021|1  |[6]       |
> +----+------+-----+----+---+----------+
> {noformat}
> In addition, if we use all 3 windows to create 3 different columns, it will 
> also work ok. So it seems the issue happens only when all the windows used 
> share the same partition and ordering.
> Here is the final plan for the faulty dataframe:
> {noformat}
> df2.filter("rn=1").explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Filter (rn#9 = 1)
>    +- Window [row_number() windowspecdefinition(row_id#1L, name#0, year#3L 
> DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
> currentrow$())) AS rn#9, collect_list(score#2L, 0, 0) 
> windowspecdefinition(row_id#1L, name#0, year#3L DESC NULLS LAST, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
> AS all_scores#16], [row_id#1L, name#0], [year#3L DESC NULLS LAST]
>       +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS LAST], 
> row_number(), 1, Final
>          +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, year#3L 
> DESC NULLS LAST], false, 0
>             +- Exchange hashpartitioning(row_id#1L, name#0, 200), 
> ENSURE_REQUIREMENTS, [plan_id=425]
>                +- WindowGroupLimit [row_id#1L, name#0], [year#3L DESC NULLS 
> LAST], row_number(), 1, Partial
>                   +- Sort [row_id#1L ASC NULLS FIRST, name#0 ASC NULLS FIRST, 
> year#3L DESC NULLS LAST], false, 0
>                      +- Scan 
> ExistingRDD[name#0,row_id#1L,score#2L,year#3L]{noformat}
> I suspect the issue is caused due to the "WindowGroupLimit" clause in the 
> plan.
> This clause doesn't appear in the dataframes that work fine, and before 
> filtering the rn.
> So I assume that since the optimizer detects that we want to only keep the 
> first row of the ranking function, it first removes all other rows from the 
> following computations, which causes the incorrect result or loss of data.
> I think the bug comes from this change (and the attached PRs):
> https://issues.apache.org/jira/browse/SPARK-44340
> It was added in spark 3.5.0, and in addition, I noticed that it was included 
> in databricks release 13.3, which included spark 3.4.0, but also this fix in 
> their release note. And evidently, this bug happens on databricks13 spark3.4, 
> but not on my local spark 3.4
> tagging user [~beliefer] as I believe you would know most about this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to