GitHub user sarutak opened a pull request:
https://github.com/apache/spark/pull/14719
[SPARK-17154][SQL] Wrong result can be returned or AnalysisException can be
thrown after self-join or similar operations
## What changes were proposed in this pull request?
When we join two DataFrames which are originated from a same DataFrame,
operations to the joined DataFrame can fail.
One reproducible example is as follows.
{code}
val df = Seq(
(1, "a", "A"),
(2, "b", "B"),
(3, "c", "C"),
(4, "d", "D"),
(5, "e", "E")).toDF("col1", "col2", "col3")
val filtered = df.filter("col1 != 3").select("col1", "col2")
val joined = filtered.join(df, filtered("col1") === df("col1"), "inner")
val selected1 = joined.select(df("col3"))
{code}
In this case, AnalysisException is thrown.
Another example is as follows.
{code}
val df = Seq(
(1, "a", "A"),
(2, "b", "B"),
(3, "c", "C"),
(4, "d", "D"),
(5, "e", "E")).toDF("col1", "col2", "col3")
val filtered = df.filter("col1 != 3").select("col1", "col2")
val rightOuterJoined = filtered.join(df, filtered("col1") === df("col1"),
"right")
val selected2 = rightOuterJoined.select(df("col1"))
selected2.show
{code}
In this case, we will expect to get the answer like as follows.
{code}
1
2
3
4
5
{code}
But the actual result is as follows.
{code}
1
2
null
4
5
{code}
The cause of the problems in the examples is that the logical plan related
to the right side DataFrame and the expressions of its output are re-created in
the analyzer (at ResolveReference rule) when a DataFrame has expressions which
have a same exprId each other.
Re-created expressions are equally to the original ones except exprId.
This will happen when we do self-join or similar pattern operations.
In the first example, df("col3") returns a Column which includes an
expression and the expression have an exprId (say id1 here).
After join, the expresion which the right side DataFrame (df) has is
re-created and the old and new expressions are equally but exprId is renewed
(say id2 for the new exprId here).
Because of the mismatch of those exprIds, AnalysisException is thrown.
In the second example, df("col1") returns a column and the expression
contained in the column is assigned an exprId (say id3).
On the other hand, a column returned by filtered("col1") has an expression
which has the same exprId (id3).
After join, the expressions in the right side DataFrame are re-created and
the expression assigned id3 is no longer present in the right side but present
in the left side.
So, referring df("col1") to the joined DataFrame, we get col1 of right side
which includes null.
To resolve this issue, I have introduced `LazilyDeterminedAttribute`.
It is returned when we refer a column like `df("expr")` and determines
which expression `df("expr")` should point to lazily.
## How was this patch tested?
I added some test cases.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sarutak/spark SPARK-17154
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/14719.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #14719
----
commit 7c40aa8982c6050aa95f6a6fee2997ef9005c380
Author: Kousuke Saruta <[email protected]>
Date: 2016-08-19T16:44:35Z
Fixed the issue that self-join or similar join patterns can cause wrong
results
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]