[ 
https://issues.apache.org/jira/browse/SPARK-24399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro resolved SPARK-24399.
--------------------------------------
       Resolution: Fixed
    Fix Version/s: 2.3.1

> Reused Exchange is used where it should not be
> ----------------------------------------------
>
>                 Key: SPARK-24399
>                 URL: https://issues.apache.org/jira/browse/SPARK-24399
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: David Vrba
>            Priority: Critical
>              Labels: correctness
>             Fix For: 2.3.1
>
>
> Reused Exchange produces wrong result. Here is the code to reproduce the 
> issue:
> {code:java}
>  
> import org.apache.spark.sql.functions.{sum, lit}
> import org.apache.spark.sql.expressions.Window
> val row1 = (1, 3, 4, 50)
> val row2 = (2, 2, 2, 250)
> val row3 = (3, 2, 4, 250)
> val row4 = (4, 3, 1, 350)
> val data = Seq(row1, row2, row3, row4)
> val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache()
> val w = Window.partitionBy($"id")
> val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"pFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("first_union_part"))
> val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"secondFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("second_union_part"))
> val finalDF = firstUnionPart.union(secondUnionPart)
> finalDF.show()
> +----------------+-----------------+ 
> |sum(metricToAgg)| t               | 
> +----------------+-----------------+ 
> | 850            | first_union_part| 
> | 850            |second_union_part| 
> +----------------+-----------------+
> {code}
>  
> The second row is wrong, it should be 250, instead of 850, which you can see 
> if you show both unionParts separately:
> {code:java}
> firstUnionPart.show() 
> +----------------+----------------+ 
> |sum(metricToAgg)|               t| 
> +----------------+----------------+ 
> |             850|first_union_part| 
> +----------------+----------------+
> secondUnionPart.show()
> +----------------+-----------------+
> |sum(metricToAgg)|                t|
> +----------------+-----------------+
> |             250|second_union_part|
> +----------------+-----------------+{code}
>  
> The ReusedExchange replaced the part of the query plan in the second branch 
> of the union by the query plan from the first branch as you can see from 
> explain() function.
> I did some inspection and it appears that both sub-plans have the same 
> canonicalized plans and therefore the ReusedExchange takes place. But I don't 
> think they should have the same canonicalized plan, since according to the 
> notes in the source code only plans that evaluate to the same result can have 
> same canonicalized plans. And the two sub-plans in this query lead in 
> principle to different results, because in the second union there is filter 
> on different column than in the first union.
>  
> Interesting think happens when we change the name of the second column from 
> "pFilter" to "kFilter". In this case query works fine and produces correct 
> result, as you can see here:
> {code:java}
> import org.apache.spark.sql.functions.{sum, lit}
> import org.apache.spark.sql.expressions.Window
> val row1 = (1, 3, 4, 50)
> val row2 = (2, 2, 2, 250)
> val row3 = (3, 2, 4, 250)
> val row4 = (4, 3, 1, 350)
> val data = Seq(row1, row2, row3, row4)
> val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache()
> val w = Window.partitionBy($"id")
> val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"kFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("first_union_part"))
> val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w))
>   .filter($"activity_sum" > 50)
>   .filter($"secondFilter".isin(2, 3))
>   .agg(sum($"metricToAgg"))
>   .withColumn("t", lit("second_union_part"))
> val finalDF = firstUnionPart.union(secondUnionPart)
> finalDF.show()
> +----------------+-----------------+
> |sum(metricToAgg)|                t|
> +----------------+-----------------+
> |             850| first_union_part|
> |             250|second_union_part|
> +----------------+-----------------+{code}
>  
> The result is now correct and the only think we changed is a name of one 
> column. The ReusedExchange does not happen here and I checked that the 
> canonicalized plans now really differ.
>  
> The key points to reproduce this bug are:
>  # Use union (or some operator with multiple branches)
>  # Use cache to have InMemoryTableScan
>  # Use operator that forces Exchange in the plan (in this case window 
> function call)
>  # Use column names that will have specific alphabetical order
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to