[ https://issues.apache.org/jira/browse/SPARK-17154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15807077#comment-15807077 ]
chie hayashida edited comment on SPARK-17154 at 1/7/17 8:17 AM: ---------------------------------------------------------------- [~nsyca], [~cloud_fan], [~sarutak] I have an example code below. h2. Example 1 {code} scala> val df = Seq((1,1,1),(1,2,3),(1,4,5),(2,2,4),(2,5,7),(2,8,8)).toDF("id","value1","value2") df: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field] scala> val df2 = df df2: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field] scala> val df3 = df.join(df2,df("id") === df2("id") && df("value2") <= df2("value2")) 17/01/07 16:29:26 WARN Column: Constructing trivially true equals predicate, 'id#171 = id#171'. Perhaps you need to use aliases. df3: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 4 more fields] scala> df3.show +---+------+------+---+------+------+ | id|value1|value2| id|value1|value2| +---+------+------+---+------+------+ | 1| 1| 1| 1| 4| 5| | 1| 1| 1| 1| 2| 3| | 1| 1| 1| 1| 1| 1| | 1| 2| 3| 1| 4| 5| | 1| 2| 3| 1| 2| 3| | 1| 2| 3| 1| 1| 1| | 1| 4| 5| 1| 4| 5| | 1| 4| 5| 1| 2| 3| | 1| 4| 5| 1| 1| 1| | 2| 2| 4| 2| 8| 8| | 2| 2| 4| 2| 5| 7| | 2| 2| 4| 2| 2| 4| | 2| 5| 7| 2| 8| 8| | 2| 5| 7| 2| 5| 7| | 2| 5| 7| 2| 2| 4| | 2| 8| 8| 2| 8| 8| | 2| 8| 8| 2| 5| 7| | 2| 8| 8| 2| 2| 4| +---+------+------+---+------+------+ scala> df3.explain == Physical Plan == *BroadcastHashJoin [id#171], [id#178], Inner, BuildRight :- LocalTableScan [id#171, value1#172, value2#173] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#178, value1#179, value2#180] {code} h2. Example2 {code} scala> val df = Seq((1,1,1),(1,2,3),(1,4,5),(2,2,4),(2,5,7),(2,8,8)).toDF("id","value1","value2") df: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field] scala> val df2 = df.select($"id".as("id2"),$"value1".as("value11"),$"value2".as("value22")) df4: org.apache.spark.sql.DataFrame = [id2: int, value11: int ... 1 more field] scala> val df3 = df.join(df2,df("id") === df2("id2") && df("value2") <= df2("value22")) df5: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 4 more fields] scala> df3.show +---+------+------+---+-------+-------+ | id|value1|value2|id2|value11|value22| +---+------+------+---+-------+-------+ | 1| 1| 1| 1| 4| 5| | 1| 1| 1| 1| 2| 3| | 1| 1| 1| 1| 1| 1| | 1| 2| 3| 1| 4| 5| | 1| 2| 3| 1| 2| 3| | 1| 4| 5| 1| 4| 5| | 2| 2| 4| 2| 8| 8| | 2| 2| 4| 2| 5| 7| | 2| 2| 4| 2| 2| 4| | 2| 5| 7| 2| 8| 8| | 2| 5| 7| 2| 5| 7| | 2| 8| 8| 2| 8| 8| +---+------+------+---+-------+-------+ scala> df3.explain == Physical Plan == *BroadcastHashJoin [id#171], [id2#243], Inner, BuildRight, (value2#173 <= value22#245) :- LocalTableScan [id#171, value1#172, value2#173] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id2#243, value11#244, value22#245] {code} The content of df3 are different between Example1 and Example2. I think the reason of this is same as SPARK-17154. In above case I understand result of Example1 is incollect and that of Example 2 is collect. But this issue isn't trivial and some developer may overlook this buggy code, I think. Permanent action should be taken for this issue, I think. was (Author: hayashidac): [~nsyca], [~cloud_fan], [~sarutak] I have an example code below. h2. Example 1 {code} scala> val df = Seq((1,1,1),(1,2,3),(1,4,5),(2,2,4),(2,5,7),(2,8,8)).toDF("id","value1","value2") df: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field] scala> val df2 = df df2: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field] scala> val df3 = df.join(df2,df("id") === df2("id") && df("value2") <= df2("value2")) 17/01/07 16:29:26 WARN Column: Constructing trivially true equals predicate, 'id#171 = id#171'. Perhaps you need to use aliases. df3: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 4 more fields] scala> df3.show +---+------+------+---+------+------+ | id|value1|value2| id|value1|value2| +---+------+------+---+------+------+ | 1| 1| 1| 1| 4| 5| | 1| 1| 1| 1| 2| 3| | 1| 1| 1| 1| 1| 1| | 1| 2| 3| 1| 4| 5| | 1| 2| 3| 1| 2| 3| | 1| 2| 3| 1| 1| 1| | 1| 4| 5| 1| 4| 5| | 1| 4| 5| 1| 2| 3| | 1| 4| 5| 1| 1| 1| | 2| 2| 4| 2| 8| 8| | 2| 2| 4| 2| 5| 7| | 2| 2| 4| 2| 2| 4| | 2| 5| 7| 2| 8| 8| | 2| 5| 7| 2| 5| 7| | 2| 5| 7| 2| 2| 4| | 2| 8| 8| 2| 8| 8| | 2| 8| 8| 2| 5| 7| | 2| 8| 8| 2| 2| 4| +---+------+------+---+------+------+ scala> df3.explain == Physical Plan == *BroadcastHashJoin [id#171], [id#178], Inner, BuildRight :- LocalTableScan [id#171, value1#172, value2#173] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#178, value1#179, value2#180] {code} h2 Example2 {code} scala> val df = Seq((1,1,1),(1,2,3),(1,4,5),(2,2,4),(2,5,7),(2,8,8)).toDF("id","value1","value2") df: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 1 more field] scala> val df2 = df.select($"id".as("id2"),$"value1".as("value11"),$"value2".as("value22")) df4: org.apache.spark.sql.DataFrame = [id2: int, value11: int ... 1 more field] scala> val df3 = df.join(df2,df("id") === df2("id2") && df("value2") <= df2("value22")) df5: org.apache.spark.sql.DataFrame = [id: int, value1: int ... 4 more fields] scala> df3.show +---+------+------+---+-------+-------+ | id|value1|value2|id2|value11|value22| +---+------+------+---+-------+-------+ | 1| 1| 1| 1| 4| 5| | 1| 1| 1| 1| 2| 3| | 1| 1| 1| 1| 1| 1| | 1| 2| 3| 1| 4| 5| | 1| 2| 3| 1| 2| 3| | 1| 4| 5| 1| 4| 5| | 2| 2| 4| 2| 8| 8| | 2| 2| 4| 2| 5| 7| | 2| 2| 4| 2| 2| 4| | 2| 5| 7| 2| 8| 8| | 2| 5| 7| 2| 5| 7| | 2| 8| 8| 2| 8| 8| +---+------+------+---+-------+-------+ scala> df3.explain == Physical Plan == *BroadcastHashJoin [id#171], [id2#243], Inner, BuildRight, (value2#173 <= value22#245) :- LocalTableScan [id#171, value1#172, value2#173] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id2#243, value11#244, value22#245] {code} The content of df3 are different between Example1 and Example2. I think the reason of this is same as SPARK-17154. In above case I understand result of Example1 is incollect and that of Example 2 is collect. But this issue isn't trivial and some developer may overlook this buggy code, I think. Permanent action should be taken for this issue, I think. > Wrong result can be returned or AnalysisException can be thrown after > self-join or similar operations > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-17154 > URL: https://issues.apache.org/jira/browse/SPARK-17154 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.2, 2.0.0 > Reporter: Kousuke Saruta > Attachments: Name-conflicts-2.pdf, Solution_Proposal_SPARK-17154.pdf > > > When we join two DataFrames which are originated from a same DataFrame, > operations to the joined DataFrame can fail. > One reproducible example is as follows. > {code} > val df = Seq( > (1, "a", "A"), > (2, "b", "B"), > (3, "c", "C"), > (4, "d", "D"), > (5, "e", "E")).toDF("col1", "col2", "col3") > val filtered = df.filter("col1 != 3").select("col1", "col2") > val joined = filtered.join(df, filtered("col1") === df("col1"), "inner") > val selected1 = joined.select(df("col3")) > {code} > In this case, AnalysisException is thrown. > Another example is as follows. > {code} > val df = Seq( > (1, "a", "A"), > (2, "b", "B"), > (3, "c", "C"), > (4, "d", "D"), > (5, "e", "E")).toDF("col1", "col2", "col3") > val filtered = df.filter("col1 != 3").select("col1", "col2") > val rightOuterJoined = filtered.join(df, filtered("col1") === df("col1"), > "right") > val selected2 = rightOuterJoined.select(df("col1")) > selected2.show > {code} > In this case, we will expect to get the answer like as follows. > {code} > 1 > 2 > 3 > 4 > 5 > {code} > But the actual result is as follows. > {code} > 1 > 2 > null > 4 > 5 > {code} > The cause of the problems in the examples is that the logical plan related to > the right side DataFrame and the expressions of its output are re-created in > the analyzer (at ResolveReference rule) when a DataFrame has expressions > which have a same exprId each other. > Re-created expressions are equally to the original ones except exprId. > This will happen when we do self-join or similar pattern operations. > In the first example, df("col3") returns a Column which includes an > expression and the expression have an exprId (say id1 here). > After join, the expresion which the right side DataFrame (df) has is > re-created and the old and new expressions are equally but exprId is renewed > (say id2 for the new exprId here). > Because of the mismatch of those exprIds, AnalysisException is thrown. > In the second example, df("col1") returns a column and the expression > contained in the column is assigned an exprId (say id3). > On the other hand, a column returned by filtered("col1") has an expression > which has the same exprId (id3). > After join, the expressions in the right side DataFrame are re-created and > the expression assigned id3 is no longer present in the right side but > present in the left side. > So, referring df("col1") to the joined DataFrame, we get col1 of right side > which includes null. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org