[ https://issues.apache.org/jira/browse/SPARK-24399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493121#comment-16493121 ]
Takeshi Yamamuro commented on SPARK-24399: ------------------------------------------ I checked on master and branch-2.3 and this issue already has been fixed there. Also, the vote for the v2.3.1 release has started, so it will be available soon. http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Spark-2-3-1-RC2-td24020.html > Reused Exchange is used where it should not be > ---------------------------------------------- > > Key: SPARK-24399 > URL: https://issues.apache.org/jira/browse/SPARK-24399 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0 > Reporter: David Vrba > Priority: Critical > Labels: correctness > > Reused Exchange produces wrong result. Here is the code to reproduce the > issue: > {code:java} > > import org.apache.spark.sql.functions.{sum, lit} > import org.apache.spark.sql.expressions.Window > val row1 = (1, 3, 4, 50) > val row2 = (2, 2, 2, 250) > val row3 = (3, 2, 4, 250) > val row4 = (4, 3, 1, 350) > val data = Seq(row1, row2, row3, row4) > val df = data.toDF("id", "pFilter", "secondFilter", "metricToAgg").cache() > val w = Window.partitionBy($"id") > val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w)) > .filter($"activity_sum" > 50) > .filter($"pFilter".isin(2, 3)) > .agg(sum($"metricToAgg")) > .withColumn("t", lit("first_union_part")) > val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w)) > .filter($"activity_sum" > 50) > .filter($"secondFilter".isin(2, 3)) > .agg(sum($"metricToAgg")) > .withColumn("t", lit("second_union_part")) > val finalDF = firstUnionPart.union(secondUnionPart) > finalDF.show() > +----------------+-----------------+ > |sum(metricToAgg)| t | > +----------------+-----------------+ > | 850 | first_union_part| > | 850 |second_union_part| > +----------------+-----------------+ > {code} > > The second row is wrong, it should be 250, instead of 850, which you can see > if you show both unionParts separately: > {code:java} > firstUnionPart.show() > +----------------+----------------+ > |sum(metricToAgg)| t| > +----------------+----------------+ > | 850|first_union_part| > +----------------+----------------+ > secondUnionPart.show() > +----------------+-----------------+ > |sum(metricToAgg)| t| > +----------------+-----------------+ > | 250|second_union_part| > +----------------+-----------------+{code} > > The ReusedExchange replaced the part of the query plan in the second branch > of the union by the query plan from the first branch as you can see from > explain() function. > I did some inspection and it appears that both sub-plans have the same > canonicalized plans and therefore the ReusedExchange takes place. But I don't > think they should have the same canonicalized plan, since according to the > notes in the source code only plans that evaluate to the same result can have > same canonicalized plans. And the two sub-plans in this query lead in > principle to different results, because in the second union there is filter > on different column than in the first union. > > Interesting think happens when we change the name of the second column from > "pFilter" to "kFilter". In this case query works fine and produces correct > result, as you can see here: > {code:java} > import org.apache.spark.sql.functions.{sum, lit} > import org.apache.spark.sql.expressions.Window > val row1 = (1, 3, 4, 50) > val row2 = (2, 2, 2, 250) > val row3 = (3, 2, 4, 250) > val row4 = (4, 3, 1, 350) > val data = Seq(row1, row2, row3, row4) > val df = data.toDF("id", "kFilter", "secondFilter", "metricToAgg").cache() > val w = Window.partitionBy($"id") > val firstUnionPart = df.withColumn("activity_sum", sum("metricToAgg").over(w)) > .filter($"activity_sum" > 50) > .filter($"kFilter".isin(2, 3)) > .agg(sum($"metricToAgg")) > .withColumn("t", lit("first_union_part")) > val secondUnionPart = df.withColumn("activity_sum",sum("metricToAgg").over(w)) > .filter($"activity_sum" > 50) > .filter($"secondFilter".isin(2, 3)) > .agg(sum($"metricToAgg")) > .withColumn("t", lit("second_union_part")) > val finalDF = firstUnionPart.union(secondUnionPart) > finalDF.show() > +----------------+-----------------+ > |sum(metricToAgg)| t| > +----------------+-----------------+ > | 850| first_union_part| > | 250|second_union_part| > +----------------+-----------------+{code} > > The result is now correct and the only think we changed is a name of one > column. The ReusedExchange does not happen here and I checked that the > canonicalized plans now really differ. > > The key points to reproduce this bug are: > # Use union (or some operator with multiple branches) > # Use cache to have InMemoryTableScan > # Use operator that forces Exchange in the plan (in this case window > function call) > # Use column names that will have specific alphabetical order > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org