[ 
https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-47320:
-------------------------
    Description: 
The behaviour of Datasets involving self joins behave in an unintuitive manner 
in terms when AnalysisException is thrown due to ambiguity and when it works.

Found situations where join order swapping causes query to throw Ambiguity 
related exceptions which otherwise passes.  Some of the Datasets which from 
user perspective are un-ambiguous will result in Analysis Exception getting 
thrown.

After testing and fixing a bug , I think the issue lies in inconsistency in 
determining what constitutes ambiguous and what is un-ambiguous.

There are two ways to look at resolution regarding ambiguity

1) ExprId of attributes : This is unintuitive approach as spark users do not 
bother with the ExprIds

2) Column Extraction from the Dataset using df(col) api : Which is the user 
visible/understandable Point of View.  So determining ambiguity should be based 
on this. What is Logically unambiguous from users perspective ( assuming its is 
logically correct) , should also be the basis of spark product, to decide on 
un-ambiguity.

For Example:
{quote} 
 val df1 = Seq((1, 2)).toDF("a", "b")
  val df2 = Seq((1, 2)).toDF("aa", "bb")
  val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
    df2("aa"), df1("b"))
  val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a"))
{quote}

The above code from perspective #1 should throw ambiguity exception, because 
the join condition and projection of df3 dataframe, has df1("a)  which has 
exprId which matches both df1Joindf2 and df1.

But if we look is from perspective of Dataset used to get column,  which is the 
intent of the user,  the expectation is that df1("a) should be resolved to 
Dataset df1 being joined, and not 
df1Joindf2.  If user intended "a" from df1Joindf2, then would have used 
df1Joindf2("a")

So In this case , current spark throws Exception as it is using resolution 
based on # 1

But the below Dataframe by the above logic, should also throw Ambiguity 
Exception but it passes
{quote}
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
  df2("aa"), df1("b"))

df1Joindf2.join(df1, df1Joindf2("a") === df1("a"))
{quote}

The difference in the 2 cases is that in the first case , select is present.
But in the 2nd query, select is not there.

So this implies that in 1st case the df1("a") in projection is causing 
ambiguity issue,  but same reference in 2nd case, used just in condition, is 
considered un-ambiguous.


IMHO ,  the ambiguity identification criteria should be based totally on #2 and 
consistently.

In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the 
tests which are being considered ambiguous ( on # 1 criteria) become 
un-ambiguous using (#2) criteria.

for eg:
{quote}
test("SPARK-28344: fail ambiguous self join - column ref in join condition") {
    val df1 = spark.range(3)
    val df2 = df1.filter($"id" > 0)

        @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest 
with SharedSparkSession {
    withSQLConf(
      SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
      assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id")))
    }
  }
{quote}
The above test should not have ambiguity exception thrown as df1("id") and 
df2("id") are un-ambiguous from perspective of Dataset


There is an existing test in DataFrameSelfJoinSuite
{quote}
test("SPARK-28344: fail ambiguous self join - column ref in Project") {
val df1 = spark.range(3)
val df2 = df1.filter($"id" > 0)

withSQLConf(
  SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false",
  SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
  // `df2("id")` actually points to the column of `df1`.
  checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 
2).map(Row(_)))

  // Alias the dataframe and use qualified column names can fix ambiguous 
self-join.
  val aliasedDf1 = df1.alias("left")
  val aliasedDf2 = df2.as("right")
  checkAnswer(
    aliasedDf1.join(aliasedDf2).select($"right.id"),
    Seq(1, 1, 1, 2, 2, 2).map(Row(_)))
}

withSQLConf(
  SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
  SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
 
// Assertion1  : existing 
 assertAmbiguousSelfJoin(df1.join(df2).select(df2("id")))

  // Assertion2 :  added by me
  assertAmbiguousSelfJoin(df2.join(df1).select(df2("id")))
}
}
{quote}

Here the Assertion1 passes ( that is ambiguous exception is thrown)
But the Assertion2 fails ( that is no ambiguous exception is thrown)
The only chnage is the join order

Logically both the assertions are invalid ( In the sense both should NOT be 
throwing Exception as from the user's perspective there is no ambiguity.

  was:
The behaviour of Datasets involving self joins behave in an unintuitive manner 
in terms when AnalysisException is thrown due to ambiguity and when it works.

Found situations where join order swapping causes query to throw Ambiguity 
related exceptions which otherwise passes.  Some of the Datasets which from 
user perspective are un-ambiguous will result in Analysis Exception getting 
thrown.

After testing and fixing a bug , I think the issue lies in inconsistency in 
determining what constitutes ambiguous and what is un-ambiguous.

There are two ways to look at resolution regarding ambiguity

1) ExprId of attributes : This is unintuitive approach as spark users do not 
bother with the ExprIds

2) Column Extraction from the Dataset using df(col) api : Which is the user 
visible/understandable Point of View.  So determining ambiguity should be based 
on this. What is Logically unambiguous from users perspective ( assuming its is 
logically correct) , should also be the basis of spark product, to decide on 
un-ambiguity.

For Example:
{quote} 
 val df1 = Seq((1, 2)).toDF("a", "b")
  val df2 = Seq((1, 2)).toDF("aa", "bb")
  val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
    df2("aa"), df1("b"))
  val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a"))
{quote}

The above code from perspective #1 should throw ambiguity exception, because 
the join condition and projection of df3 dataframe, has df1("a)  which has 
exprId which matches both df1Joindf2 and df1.

But if we look is from perspective of Dataset used to get column,  which is the 
intent of the user,  the expectation is that df1("a) should be resolved to 
Dataset df1 being joined, and not 
df1Joindf2.  If user intended "a" from df1Joindf2, then would have used 
df1Joindf2("a")

So In this case , current spark throws Exception as it is using resolution 
based on # 1

But the below Dataframe by the above logic, should also throw Ambiguity 
Exception but it passes
{quote}
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
  df2("aa"), df1("b"))

df1Joindf2.join(df1, df1Joindf2("a") === df1("a"))
{quote}

The difference in the 2 cases is that in the first case , select is present.
But in the 2nd query, select is not there.

So this implies that in 1st case the df1("a") in projection is causing 
ambiguity issue,  but same reference in 2nd case, used just in condition, is 
considered un-ambiguous.


IMHO ,  the ambiguity identification criteria should be based totally on #2 and 
consistently.

In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the 
tests which are being considered ambiguous ( on # 1 criteria) become 
un-ambiguous using (#2) criteria.

for eg:
{quote}
test("SPARK-28344: fail ambiguous self join - column ref in join condition") {
    val df1 = spark.range(3)
    val df2 = df1.filter($"id" > 0)

        @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest 
with SharedSparkSession {
    withSQLConf(
      SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
      SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
      assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id")))
    }
  }
{quote}
The above test should not have ambiguity exception thrown as df1("id") and 
df2("id") are un-ambiguous from perspective of Dataset


There is an existing test in DataFrameSelfJoinSuite
`
test("SPARK-28344: fail ambiguous self join - column ref in Project") {
val df1 = spark.range(3)
val df2 = df1.filter($"id" > 0)

withSQLConf(
  SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false",
  SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
  // `df2("id")` actually points to the column of `df1`.
  checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 
2).map(Row(_)))

  // Alias the dataframe and use qualified column names can fix ambiguous 
self-join.
  val aliasedDf1 = df1.alias("left")
  val aliasedDf2 = df2.as("right")
  checkAnswer(
    aliasedDf1.join(aliasedDf2).select($"right.id"),
    Seq(1, 1, 1, 2, 2, 2).map(Row(_)))
}

withSQLConf(
  SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
  SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
 
// Assertion1  : existing 
 assertAmbiguousSelfJoin(df1.join(df2).select(df2("id")))

  // Assertion2 :  added by me
  assertAmbiguousSelfJoin(df2.join(df1).select(df2("id")))
}
}
`
Here the Assertion1 passes ( that is ambiguous exception is thrown)
But the Assertion2 fails ( that is no ambiguous exception is thrown)
The only chnage is the join order

Logically both the assertions are invalid ( In the sense both should NOT be 
throwing Exception as from the user's perspective there is no ambiguity.


> Datasets involving self joins behave in an inconsistent and unintuitive  
> manner 
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-47320
>                 URL: https://issues.apache.org/jira/browse/SPARK-47320
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.1
>            Reporter: Asif
>            Priority: Major
>
> The behaviour of Datasets involving self joins behave in an unintuitive 
> manner in terms when AnalysisException is thrown due to ambiguity and when it 
> works.
> Found situations where join order swapping causes query to throw Ambiguity 
> related exceptions which otherwise passes.  Some of the Datasets which from 
> user perspective are un-ambiguous will result in Analysis Exception getting 
> thrown.
> After testing and fixing a bug , I think the issue lies in inconsistency in 
> determining what constitutes ambiguous and what is un-ambiguous.
> There are two ways to look at resolution regarding ambiguity
> 1) ExprId of attributes : This is unintuitive approach as spark users do not 
> bother with the ExprIds
> 2) Column Extraction from the Dataset using df(col) api : Which is the user 
> visible/understandable Point of View.  So determining ambiguity should be 
> based on this. What is Logically unambiguous from users perspective ( 
> assuming its is logically correct) , should also be the basis of spark 
> product, to decide on un-ambiguity.
> For Example:
> {quote} 
>  val df1 = Seq((1, 2)).toDF("a", "b")
>   val df2 = Seq((1, 2)).toDF("aa", "bb")
>   val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
>     df2("aa"), df1("b"))
>   val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === 
> df1("a")).select(df1("a"))
> {quote}
> The above code from perspective #1 should throw ambiguity exception, because 
> the join condition and projection of df3 dataframe, has df1("a)  which has 
> exprId which matches both df1Joindf2 and df1.
> But if we look is from perspective of Dataset used to get column,  which is 
> the intent of the user,  the expectation is that df1("a) should be resolved 
> to Dataset df1 being joined, and not 
> df1Joindf2.  If user intended "a" from df1Joindf2, then would have used 
> df1Joindf2("a")
> So In this case , current spark throws Exception as it is using resolution 
> based on # 1
> But the below Dataframe by the above logic, should also throw Ambiguity 
> Exception but it passes
> {quote}
> val df1 = Seq((1, 2)).toDF("a", "b")
> val df2 = Seq((1, 2)).toDF("aa", "bb")
> val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
>   df2("aa"), df1("b"))
> df1Joindf2.join(df1, df1Joindf2("a") === df1("a"))
> {quote}
> The difference in the 2 cases is that in the first case , select is present.
> But in the 2nd query, select is not there.
> So this implies that in 1st case the df1("a") in projection is causing 
> ambiguity issue,  but same reference in 2nd case, used just in condition, is 
> considered un-ambiguous.
> IMHO ,  the ambiguity identification criteria should be based totally on #2 
> and consistently.
> In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of 
> the tests which are being considered ambiguous ( on # 1 criteria) become 
> un-ambiguous using (#2) criteria.
> for eg:
> {quote}
> test("SPARK-28344: fail ambiguous self join - column ref in join condition") {
>     val df1 = spark.range(3)
>     val df2 = df1.filter($"id" > 0)
>       @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest 
> with SharedSparkSession {
>     withSQLConf(
>       SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
>       SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
>       assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id")))
>     }
>   }
> {quote}
> The above test should not have ambiguity exception thrown as df1("id") and 
> df2("id") are un-ambiguous from perspective of Dataset
> There is an existing test in DataFrameSelfJoinSuite
> {quote}
> test("SPARK-28344: fail ambiguous self join - column ref in Project") {
> val df1 = spark.range(3)
> val df2 = df1.filter($"id" > 0)
> withSQLConf(
>   SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false",
>   SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
>   // `df2("id")` actually points to the column of `df1`.
>   checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 
> 2).map(Row(_)))
>   // Alias the dataframe and use qualified column names can fix ambiguous 
> self-join.
>   val aliasedDf1 = df1.alias("left")
>   val aliasedDf2 = df2.as("right")
>   checkAnswer(
>     aliasedDf1.join(aliasedDf2).select($"right.id"),
>     Seq(1, 1, 1, 2, 2, 2).map(Row(_)))
> }
> withSQLConf(
>   SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
>   SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
>  
> // Assertion1  : existing 
>  assertAmbiguousSelfJoin(df1.join(df2).select(df2("id")))
>   // Assertion2 :  added by me
>   assertAmbiguousSelfJoin(df2.join(df1).select(df2("id")))
> }
> }
> {quote}
> Here the Assertion1 passes ( that is ambiguous exception is thrown)
> But the Assertion2 fails ( that is no ambiguous exception is thrown)
> The only chnage is the join order
> Logically both the assertions are invalid ( In the sense both should NOT be 
> throwing Exception as from the user's perspective there is no ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to