[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214817510 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -754,11 +754,16 @@ class Analyzer( * a logical plan node's children. */ object ResolveReferences extends Rule[LogicalPlan] { + +private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) --- End diff -- I'd prefer to do [what I suggested previously](https://github.com/apache/spark/pull/22318#discussion_r214678100) as it would be easier to reuse if it will be needed. In this way, next time we need again an empty AttributeMap we need to create a new one and we'd end up with several of them. cc @cloud-fan @maropu WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214793247 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +val a = spark.range(1, 5) +val b = spark.range(10) +val c = b.filter($"id" % 2 === 0) + +val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- That simpler join doesn't hit the issue. It is handled by a different rule `ResolveNaturalAndUsingJoin`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214752480 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +val a = spark.range(1, 5) +val b = spark.range(10) +val c = b.filter($"id" % 2 === 0) + +val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- Why is this a simpler `a.join(b, "id").join(c, "id")`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214732767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -817,7 +819,7 @@ class Analyzer( case s: SubqueryExpression => s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } - } + }, attributeRewrites) --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214732751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - right transformUp { + (right transformUp { --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214732731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -921,12 +930,16 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => -j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => +val (dedupedRight, attributeRewrites) = dedupRight(left, right) +val changedCondition = condition.map(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- thanks! fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214692193 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) --- End diff -- just commented the same meanwhile :) sorry, I saw your comments only now :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214677137 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - right transformUp { + (right transformUp { --- End diff -- nit: may be cleaner doing something like `val newPlan = ` and then return ``` (newPlan, attributeRewrites) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214678100 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala --- @@ -26,6 +26,8 @@ object AttributeMap { def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = { new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) } + + def empty[A](): AttributeMap[A] = new AttributeMap(Map.empty) --- End diff -- I think it'd be better if we can avoid creating a new Map every time. In Scala, this is usually handled creating an empty object and returning it on the invocation of empty (eg. refer to the `Nil` implementation for `List`). The only problem is that this would require changing `AttributeMap[A]` to `AttributeMap[+A]`. I am not sure whether this may break binary compatibility and therefore whether this is an acceptable change or not. In case it is, it would be great to do this I think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214690245 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) --- End diff -- Since I don't want to build an empty object per call, how about defining an empty map in this class field? e.g., ``` object ResolveReferences extends Rule[LogicalPlan] { private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214689283 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -817,7 +819,7 @@ class Analyzer( case s: SubqueryExpression => s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } - } + }, attributeRewrites) --- End diff -- Then, `(newRight, attributeRewrites)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214689183 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -805,10 +807,10 @@ class Analyzer( * that this rule cannot handle. When that is the case, there must be another rule * that resolves these conflicts. Otherwise, the analysis will fail. */ - right + (right, AttributeMap.empty[Attribute]) case Some((oldRelation, newRelation)) => val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - right transformUp { + (right transformUp { --- End diff -- `(right transformUp {` -> `val newRight = right transformUp {`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214688902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -921,12 +930,16 @@ class Analyzer( failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect - case j @ Join(left, right, _, _) if !j.duplicateResolved => -j.copy(right = dedupRight(left, right)) + case j @ Join(left, right, _, condition) if !j.duplicateResolved => +val (dedupedRight, attributeRewrites) = dedupRight(left, right) +val changedCondition = condition.map(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- How about this? ``` val changedCondition = condition.map { _.transform { case attr: Attribute => attributeRewrites.getOrElse(attr, attr) }} ``` Then, removes `rewriteJoinCondition`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214670143 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -895,6 +897,13 @@ class Analyzer( case _ => e.mapChildren(resolve(_, q)) } +private def rewriteJoinCondition( +e: Expression, +attributeRewrites: AttributeMap[Attribute]): Expression = e match { + case a: Attribute => attributeRewrites.getOrElse(a, a) + case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- oh, sorry, I see now, sorry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214666748 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- I think we do need `a` here. If we dropped `a` and the test would become like: ```scala val b = spark.range(1, 5) val c = b.filter($"id" % 2 === 0) val r = b.join(c, b("id") === c("id"), "inner") checkAnswer(r, Row(2, 2) :: Row(4, 4) :: Nil) ``` then the test would pass even without the fix. This is because we have a special case to handle {{id = id}} like conditions in case of EqualTo and EqualNullSafe in Dataset. My fix comes into play in some other cases where there is an AttributeReference change in the right side of a join due to deduplication. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r21451 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -895,6 +897,13 @@ class Analyzer( case _ => e.mapChildren(resolve(_, q)) } +private def rewriteJoinCondition( +e: Expression, +attributeRewrites: AttributeMap[Attribute]): Expression = e match { + case a: Attribute => attributeRewrites.getOrElse(a, a) + case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- sorry, we can't do that, it wouldn't mean the same --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214666333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { --- End diff -- actually we can remove this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user peter-toth commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214666206 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") + + checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) +} + } + --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214601491 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -895,6 +897,13 @@ class Analyzer( case _ => e.mapChildren(resolve(_, q)) } +private def rewriteJoinCondition( +e: Expression, +attributeRewrites: AttributeMap[Attribute]): Expression = e match { + case a: Attribute => attributeRewrites.getOrElse(a, a) + case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) --- End diff -- nit: may be more clear to do: ``` case other => e.mapChildren(rewriteJoinCondition(other, attributeRewrites)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214573505 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- Do we need a df `a` for this test? I think a simple test is better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214573271 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { + val a = spark.range(1, 5) + val b = spark.range(10) + val c = b.filter($"id" % 2 === 0) + + val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") + + checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) +} + } + --- End diff -- nit: remove this empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214573288 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,17 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { --- End diff -- `withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
GitHub user peter-toth opened a pull request: https://github.com/apache/spark/pull/22318 [SPARK-25150][SQL] Fix attribute deduplication in join ## What changes were proposed in this pull request? Fixes attribute deduplication in join conditions. ## How was this patch tested? Added unit test. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/peter-toth/spark SPARK-25150 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22318.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22318 commit be7d8e7fb8439e3bb3238269263a37556e6bf9b1 Author: Peter Toth Date: 2018-09-02T17:56:18Z [SPARK-25150][SQL] Fix attribute deduplication in join --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org