[ 
https://issues.apache.org/jira/browse/SPARK-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Liang-Chi Hsieh updated SPARK-6550:
-----------------------------------
    Description: 
h2. Problems

In some cases, the expressions in a logical plan will be modified to new ones 
during analysis, e.g. the handling for self-join cases. If some expressions are 
resolved based on the analyzed plan, they are referring to changed expression 
ids, not original ids.

But the transformation of DataFrame will use logical plan to construct new 
DataFrame, e.g. {{groupBy}} and aggregation. So in such cases, the expressions 
in these DataFrames will be inconsistent.

The problems are specified as following:

# Expression ids in logical plan are possibly inconsistent if expression ids 
are changed during analysis and some expressions are resolved after that

When we try to run the following codes:
{code}
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === 
$"y.str").groupBy("y.str").min("y.int")
{code}

Because {{groupBy}} and {{min}} will perform resolving based on the analyzed 
logical plan, their expression ids refer to analyzed plan, instead of logical 
plan.

So the logical plan of df2 looks like:

{code}
'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
 'Join Inner, Some(('x.str = 'y.str))
  Subquery x
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
  Subquery y
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
{code}

As you see, the expression ids in {{Aggregate}} are different to the expression 
ids in {{Subquery y}}. This is the first problem.

# The {{df2}} can't be performed

The showing logical plan of {{df2}} can't be performed. Because the expression 
ids of {{Subquery y}} will be modified for self-join handling during analysis, 
the analyzed plan of {{df2}} becomes:

{code}
Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
 Join Inner, Some((str#3 = str#8))
  Subquery x
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
  Subquery y
   Project [_1#0 AS int#7,_2#1 AS str#8]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
{code}

The expressions referred in {{Aggregate}} are not matching to these in 
{{Subquery y}}. This is the second problem.

h2. Proposed solution

We try to add a {{PreAnalyzer}}. When a logical plan {{rawPlan}} is given to 
SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to 
{{QueryExecution.logical}}. Then later operations will based on the 
pre-analyzed logical plan, instead of the original {{rawPlan}}.


  was:
h2. Problems

In some cases, the expressions in a logical plan will be modified to new ones 
during analysis, e.g. the handling for self-join cases. If some expressions are 
resolved based on the analyzed plan, they are referring to changed expression 
ids, not original ids.

But the transformation of DataFrame will use logical plan to construct new 
DataFrame, e.g. {{groupBy}} and aggregation. So in such cases, the expressions 
in these DataFrames will be inconsistent.

The problems are specified as following:

# Expression ids in logical plan are possibly inconsistent if expression ids 
are changed during analysis and some expressions are resolved after that

When we try to run the following codes:
{code}
val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === 
$"y.str").groupBy("y.str").min("y.int")
{code}

Because {{groupBy}} and {{min}} will perform resolving based on the analyzed 
logical plan, their expression ids refer to analyzed plan, instead of logical 
plan.

So the logical plan of df2 looks like:

{code}
'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
 'Join Inner, Some(('x.str = 'y.str))
  Subquery x
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
  Subquery y
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
{code}

As you see, the expression ids in {{Aggregate}} are different to the expression 
ids in {{Subquery y}}. This is the first problem.

# The {{df2}} can't be performed

The showing logical plan of {{df2}} can't be performed. Because the expression 
ids of {{Subquery y}} will be modified for self-join handling during analysis, 
the analyzed plan of {{df2}} becomes:

{code}
Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
 Join Inner, Some((str#3 = str#8))
  Subquery x
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
  Subquery y
   Project [_1#0 AS int#7,_2#1 AS str#8]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
{code}

The expressions referred in {{Aggregate}} are not matching to these in 
{{Subquery y}}. This is the second problem.

h2. Proposed solution

We try to add a PreAnalyzer. When a logical plan {{rawPlan}} is given to 
SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to 
{{QueryExecution.logical}}. Then later operations will based on the 
pre-analyzed logical plan, instead of the original {{rawPlan}}.



> Add PreAnalyzer to keep logical plan consistent across DataFrame
> ----------------------------------------------------------------
>
>                 Key: SPARK-6550
>                 URL: https://issues.apache.org/jira/browse/SPARK-6550
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Liang-Chi Hsieh
>
> h2. Problems
> In some cases, the expressions in a logical plan will be modified to new ones 
> during analysis, e.g. the handling for self-join cases. If some expressions 
> are resolved based on the analyzed plan, they are referring to changed 
> expression ids, not original ids.
> But the transformation of DataFrame will use logical plan to construct new 
> DataFrame, e.g. {{groupBy}} and aggregation. So in such cases, the 
> expressions in these DataFrames will be inconsistent.
> The problems are specified as following:
> # Expression ids in logical plan are possibly inconsistent if expression ids 
> are changed during analysis and some expressions are resolved after that
> When we try to run the following codes:
> {code}
> val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
> val df2 = df.as('x).join(df.as('y), $"x.str" === 
> $"y.str").groupBy("y.str").min("y.int")
> {code}
> Because {{groupBy}} and {{min}} will perform resolving based on the analyzed 
> logical plan, their expression ids refer to analyzed plan, instead of logical 
> plan.
> So the logical plan of df2 looks like:
> {code}
> 'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
>  'Join Inner, Some(('x.str = 'y.str))
>   Subquery x
>    Project [_1#0 AS int#2,_2#1 AS str#3]
>     LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
>   Subquery y
>    Project [_1#0 AS int#2,_2#1 AS str#3]
>     LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
> {code}
> As you see, the expression ids in {{Aggregate}} are different to the 
> expression ids in {{Subquery y}}. This is the first problem.
> # The {{df2}} can't be performed
> The showing logical plan of {{df2}} can't be performed. Because the 
> expression ids of {{Subquery y}} will be modified for self-join handling 
> during analysis, the analyzed plan of {{df2}} becomes:
> {code}
> Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
>  Join Inner, Some((str#3 = str#8))
>   Subquery x
>    Project [_1#0 AS int#2,_2#1 AS str#3]
>     LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
>   Subquery y
>    Project [_1#0 AS int#7,_2#1 AS str#8]
>     LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
> {code}
> The expressions referred in {{Aggregate}} are not matching to these in 
> {{Subquery y}}. This is the second problem.
> h2. Proposed solution
> We try to add a {{PreAnalyzer}}. When a logical plan {{rawPlan}} is given to 
> SQLContext, it uses PreAnalyzer to modify the logical plan before assigning 
> to {{QueryExecution.logical}}. Then later operations will based on the 
> pre-analyzed logical plan, instead of the original {{rawPlan}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to