[ 
https://issues.apache.org/jira/browse/SPARK-36874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vincent Doba updated SPARK-36874:
---------------------------------
    Description: 
When joining two dataframes, if they share the same lineage and one dataframe 
is a transformation of the other, Ambiguous Self Join detection only works when 
transformed dataframe is the right dataframe. 

For instance {{df1}} and {{df2}} where {{df2}} is a filtered {{df1}}, Ambiguous 
Self Join detection only works when {{df2}} is the right dataframe:

- {{df1.join(df2, ...)}} correctly fails with Ambiguous Self Join error
- {{df2.join(df1, ...)}} returns a valid dataframe

h1. Minimum Reproducible example
h2. Code
{code:scala}
import sparkSession.implicit._

val df1 = Seq((1, 2, "A1"),(3,4, "A2")).toDF("key1", "key2", "value")

val df2 = df1.filter($"value" === "A2")

df2.join(df1, df1("key1") === df2("key2")).show()
{code}
h2. Expected Result

Throw the following exception:

{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 
key2#11 are ambiguous. It's probably because you joined several Datasets 
together, and some of these Datasets are the same. This column points to one of 
the Datasets but Spark is unable to figure out which one. Please alias the 
Datasets with different names via `Dataset.as` before joining them, and specify 
the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > 
$"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false 
to disable this check.
        at 
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:157)
        at 
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:43)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:196)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:190)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:155)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:174)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3715)
        at org.apache.spark.sql.Dataset.join(Dataset.scala:1079)
        at org.apache.spark.sql.Dataset.join(Dataset.scala:1041)
 ...
{code}
h2. Actual result

Empty dataframe:
{code:java}
+----+----+-----+----+----+-----+
|key1|key2|value|key1|key2|value|
+----+----+-----+----+----+-----+
+----+----+-----+----+----+-----+
{code}

  was:
When joining two dataframes, if they share the same lineage and one dataframe 
is a transformation of the other, Ambiguous Self Join detection only works when 
transformed dataframe is the right dataframe. 

For instance {{df1}} and {{df2}} where {{df2}} is a filtered {{df1}}, Ambiguous 
Self Join detection only works when {{df2}} is the right dataframe:

- {{df1.join(df2, ...)}} correctly fails with Ambiguous Self Join error
- {{df2.join(df1, ...)}} returns a valid dataframe

h1. Minimum Reproducible example
h2. Code
{code:scala}
import sparkSession.implicit._

val df1 = Seq((1, 2, "A1"),(3,4, "A2")).toDF("key1", "key2", "value")

val df2 = df1.filter($"value" === "A2")

df2.join(df1, df1("key1") === df2("key2")).show()
{code}
h2. Expected Result

Throw the following exception:

{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 
key2#11 are ambiguous. It's probably because you joined several Datasets 
together, and some of these Datasets are the same. This column points to one of 
the Datasets but Spark is unable to figure out which one. Please alias the 
Datasets with different names via `Dataset.as` before joining them, and specify 
the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > 
$"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false 
to disable this check.
        at 
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:157)
        at 
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:43)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
        at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:196)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:190)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:155)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:174)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3715)
        at org.apache.spark.sql.Dataset.join(Dataset.scala:1079)
        at org.apache.spark.sql.Dataset.join(Dataset.scala:1041)
 ...
{code}
h2. Actual result

Empty dataframe:
{code:java}
+----+----+-----+----+----+-----+
|key1|key2|value|key1|key2|value|
+----+----+-----+----+----+-----+
+----+----+-----+----+----+-----+
{code}

h1. Related issue

https://issues.apache.org/jira/browse/SPARK-28344


> Ambiguous Self-Join detected only on right dataframe
> ----------------------------------------------------
>
>                 Key: SPARK-36874
>                 URL: https://issues.apache.org/jira/browse/SPARK-36874
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Vincent Doba
>            Priority: Major
>              Labels: correctness
>
> When joining two dataframes, if they share the same lineage and one dataframe 
> is a transformation of the other, Ambiguous Self Join detection only works 
> when transformed dataframe is the right dataframe. 
> For instance {{df1}} and {{df2}} where {{df2}} is a filtered {{df1}}, 
> Ambiguous Self Join detection only works when {{df2}} is the right dataframe:
> - {{df1.join(df2, ...)}} correctly fails with Ambiguous Self Join error
> - {{df2.join(df1, ...)}} returns a valid dataframe
> h1. Minimum Reproducible example
> h2. Code
> {code:scala}
> import sparkSession.implicit._
> val df1 = Seq((1, 2, "A1"),(3,4, "A2")).toDF("key1", "key2", "value")
> val df2 = df1.filter($"value" === "A2")
> df2.join(df1, df1("key1") === df2("key2")).show()
> {code}
> h2. Expected Result
> Throw the following exception:
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Column 
> key2#11 are ambiguous. It's probably because you joined several Datasets 
> together, and some of these Datasets are the same. This column points to one 
> of the Datasets but Spark is unable to figure out which one. Please alias the 
> Datasets with different names via `Dataset.as` before joining them, and 
> specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), 
> $"a.id" > $"b.id")`. You can also set 
> spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
>       at 
> org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:157)
>       at 
> org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin$.apply(DetectAmbiguousSelfJoin.scala:43)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:216)
>       at 
> scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
>       at 
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
>       at scala.collection.immutable.List.foldLeft(List.scala:91)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:213)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:205)
>       at scala.collection.immutable.List.foreach(List.scala:431)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:205)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:196)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:190)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:155)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
>       at 
> org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:174)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
>       at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
>       at 
> org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>       at 
> org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
>       at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
>       at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
>       at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
>       at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3715)
>       at org.apache.spark.sql.Dataset.join(Dataset.scala:1079)
>       at org.apache.spark.sql.Dataset.join(Dataset.scala:1041)
>  ...
> {code}
> h2. Actual result
> Empty dataframe:
> {code:java}
> +----+----+-----+----+----+-----+
> |key1|key2|value|key1|key2|value|
> +----+----+-----+----+----+-----+
> +----+----+-----+----+----+-----+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to