[ https://issues.apache.org/jira/browse/SPARK-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Liang-Chi Hsieh updated SPARK-6550: ----------------------------------- Description: h2. Problems In some cases, the expressions in a logical plan will be modified to new ones during analysis, e.g. the handling for self-join cases. If some expressions are resolved based on the analyzed plan, they are referring to changed expression ids, not original ids. But the transformation of DataFrame will use logical plan to construct new DataFrame, e.g. {{groupBy}} and aggregation. So in such cases, the expressions in these DataFrames will be inconsistent. The problems are specified as following: # Expression ids in logical plan are possibly inconsistent if expression ids are changed during analysis and some expressions are resolved after that When we try to run the following codes: {code} val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").min("y.int") {code} Because {{groupBy}} and {{min}} will perform resolving based on the analyzed logical plan, their expression ids refer to analyzed plan, instead of logical plan. So the logical plan of df2 looks like: {code} 'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6] 'Join Inner, Some(('x.str = 'y.str)) Subquery x Project [_1#0 AS int#2,_2#1 AS str#3] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] Subquery y Project [_1#0 AS int#2,_2#1 AS str#3] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] {code} As you see, the expression ids in {{Aggregate}} are different to the expression ids in {{Subquery y}}. This is the first problem. # The {{df2}} can't be performed The showing logical plan of {{df2}} can't be performed. Because the expression ids of {{Subquery y}} will be modified for self-join handling during analysis, the analyzed plan of {{df2}} becomes: {code} Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6] Join Inner, Some((str#3 = str#8)) Subquery x Project [_1#0 AS int#2,_2#1 AS str#3] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] Subquery y Project [_1#0 AS int#7,_2#1 AS str#8] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] {code} The expressions referred in {{Aggregate}} are not matching to these in {{Subquery y}}. This is the second problem. h2. Proposed solution We try to add a {{PreAnalyzer}}. When a logical plan {{rawPlan}} is given to SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to {{QueryExecution.logical}}. Then later operations will based on the pre-analyzed logical plan, instead of the original {{rawPlan}}. was: h2. Problems In some cases, the expressions in a logical plan will be modified to new ones during analysis, e.g. the handling for self-join cases. If some expressions are resolved based on the analyzed plan, they are referring to changed expression ids, not original ids. But the transformation of DataFrame will use logical plan to construct new DataFrame, e.g. {{groupBy}} and aggregation. So in such cases, the expressions in these DataFrames will be inconsistent. The problems are specified as following: # Expression ids in logical plan are possibly inconsistent if expression ids are changed during analysis and some expressions are resolved after that When we try to run the following codes: {code} val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").min("y.int") {code} Because {{groupBy}} and {{min}} will perform resolving based on the analyzed logical plan, their expression ids refer to analyzed plan, instead of logical plan. So the logical plan of df2 looks like: {code} 'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6] 'Join Inner, Some(('x.str = 'y.str)) Subquery x Project [_1#0 AS int#2,_2#1 AS str#3] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] Subquery y Project [_1#0 AS int#2,_2#1 AS str#3] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] {code} As you see, the expression ids in {{Aggregate}} are different to the expression ids in {{Subquery y}}. This is the first problem. # The {{df2}} can't be performed The showing logical plan of {{df2}} can't be performed. Because the expression ids of {{Subquery y}} will be modified for self-join handling during analysis, the analyzed plan of {{df2}} becomes: {code} Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6] Join Inner, Some((str#3 = str#8)) Subquery x Project [_1#0 AS int#2,_2#1 AS str#3] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] Subquery y Project [_1#0 AS int#7,_2#1 AS str#8] LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] {code} The expressions referred in {{Aggregate}} are not matching to these in {{Subquery y}}. This is the second problem. h2. Proposed solution We try to add a PreAnalyzer. When a logical plan {{rawPlan}} is given to SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to {{QueryExecution.logical}}. Then later operations will based on the pre-analyzed logical plan, instead of the original {{rawPlan}}. > Add PreAnalyzer to keep logical plan consistent across DataFrame > ---------------------------------------------------------------- > > Key: SPARK-6550 > URL: https://issues.apache.org/jira/browse/SPARK-6550 > Project: Spark > Issue Type: Bug > Components: SQL > Reporter: Liang-Chi Hsieh > > h2. Problems > In some cases, the expressions in a logical plan will be modified to new ones > during analysis, e.g. the handling for self-join cases. If some expressions > are resolved based on the analyzed plan, they are referring to changed > expression ids, not original ids. > But the transformation of DataFrame will use logical plan to construct new > DataFrame, e.g. {{groupBy}} and aggregation. So in such cases, the > expressions in these DataFrames will be inconsistent. > The problems are specified as following: > # Expression ids in logical plan are possibly inconsistent if expression ids > are changed during analysis and some expressions are resolved after that > When we try to run the following codes: > {code} > val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") > val df2 = df.as('x).join(df.as('y), $"x.str" === > $"y.str").groupBy("y.str").min("y.int") > {code} > Because {{groupBy}} and {{min}} will perform resolving based on the analyzed > logical plan, their expression ids refer to analyzed plan, instead of logical > plan. > So the logical plan of df2 looks like: > {code} > 'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6] > 'Join Inner, Some(('x.str = 'y.str)) > Subquery x > Project [_1#0 AS int#2,_2#1 AS str#3] > LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] > Subquery y > Project [_1#0 AS int#2,_2#1 AS str#3] > LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] > {code} > As you see, the expression ids in {{Aggregate}} are different to the > expression ids in {{Subquery y}}. This is the first problem. > # The {{df2}} can't be performed > The showing logical plan of {{df2}} can't be performed. Because the > expression ids of {{Subquery y}} will be modified for self-join handling > during analysis, the analyzed plan of {{df2}} becomes: > {code} > Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6] > Join Inner, Some((str#3 = str#8)) > Subquery x > Project [_1#0 AS int#2,_2#1 AS str#3] > LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] > Subquery y > Project [_1#0 AS int#7,_2#1 AS str#8] > LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]] > {code} > The expressions referred in {{Aggregate}} are not matching to these in > {{Subquery y}}. This is the second problem. > h2. Proposed solution > We try to add a {{PreAnalyzer}}. When a logical plan {{rawPlan}} is given to > SQLContext, it uses PreAnalyzer to modify the logical plan before assigning > to {{QueryExecution.logical}}. Then later operations will based on the > pre-analyzed logical plan, instead of the original {{rawPlan}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org