Repository: spark Updated Branches: refs/heads/master 25f506e2a -> 657fd00b5
[SPARK-25988][SQL] Keep names unchanged when deduplicating the column names in Analyzer ## What changes were proposed in this pull request? When the queries do not use the column names with the same case, users might hit various errors. Below is a typical test failure they can hit. ``` Expected only partition pruning predicates: ArrayBuffer(isnotnull(tdate#237), (cast(tdate#237 as string) >= 2017-08-15)); org.apache.spark.sql.AnalysisException: Expected only partition pruning predicates: ArrayBuffer(isnotnull(tdate#237), (cast(tdate#237 as string) >= 2017-08-15)); at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:146) at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.listPartitionsByFilter(InMemoryCatalog.scala:560) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilter(SessionCatalog.scala:925) ``` ## How was this patch tested? Added two test cases. Closes #22990 from gatorsmile/fix1283. Authored-by: gatorsmile <gatorsm...@gmail.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/657fd00b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/657fd00b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/657fd00b Branch: refs/heads/master Commit: 657fd00b5204859c2e6d7c19a71a3ec5ecf7c869 Parents: 25f506e Author: gatorsmile <gatorsm...@gmail.com> Authored: Fri Nov 9 08:22:26 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri Nov 9 08:22:26 2018 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +- .../sql/catalyst/analysis/unresolved.scala | 1 + .../catalyst/expressions/namedExpressions.scala | 5 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 53 ++++++++++++++++++++ 4 files changed, 60 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/657fd00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c2d22c5..6dc5b3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -824,7 +824,8 @@ class Analyzer( } private def dedupAttr(attr: Attribute, attrMap: AttributeMap[Attribute]): Attribute = { - attrMap.get(attr).getOrElse(attr).withQualifier(attr.qualifier) + val exprId = attrMap.getOrElse(attr, attr).exprId + attr.withExprId(exprId) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/657fd00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 857cf38..36cad3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -112,6 +112,7 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un override def withQualifier(newQualifier: Seq[String]): UnresolvedAttribute = this override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName) override def withMetadata(newMetadata: Metadata): Attribute = this + override def withExprId(newExprId: ExprId): UnresolvedAttribute = this override def toString: String = s"'$name" http://git-wip-us.apache.org/repos/asf/spark/blob/657fd00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 584a294..049ea77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -115,6 +115,7 @@ abstract class Attribute extends LeafExpression with NamedExpression with NullIn def withQualifier(newQualifier: Seq[String]): Attribute def withName(newName: String): Attribute def withMetadata(newMetadata: Metadata): Attribute + def withExprId(newExprId: ExprId): Attribute override def toAttribute: Attribute = this def newInstance(): Attribute @@ -299,7 +300,7 @@ case class AttributeReference( } } - def withExprId(newExprId: ExprId): AttributeReference = { + override def withExprId(newExprId: ExprId): AttributeReference = { if (exprId == newExprId) { this } else { @@ -362,6 +363,8 @@ case class PrettyAttribute( throw new UnsupportedOperationException override def qualifier: Seq[String] = throw new UnsupportedOperationException override def exprId: ExprId = throw new UnsupportedOperationException + override def withExprId(newExprId: ExprId): Attribute = + throw new UnsupportedOperationException override def nullable: Boolean = true } http://git-wip-us.apache.org/repos/asf/spark/blob/657fd00b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 631ab1b..dbb0790 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2856,6 +2856,59 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(sql("select 26393499451 / (1e6 * 1000)"), Row(BigDecimal("26.3934994510000"))) } } + + test("SPARK-25988: self join with aliases on partitioned tables #1") { + withTempView("tmpView1", "tmpView2") { + withTable("tab1", "tab2") { + sql( + """ + |CREATE TABLE `tab1` (`col1` INT, `TDATE` DATE) + |USING CSV + |PARTITIONED BY (TDATE) + """.stripMargin) + spark.table("tab1").where("TDATE >= '2017-08-15'").createOrReplaceTempView("tmpView1") + sql("CREATE TABLE `tab2` (`TDATE` DATE) USING parquet") + sql( + """ + |CREATE OR REPLACE TEMPORARY VIEW tmpView2 AS + |SELECT N.tdate, col1 AS aliasCol1 + |FROM tmpView1 N + |JOIN tab2 Z + |ON N.tdate = Z.tdate + """.stripMargin) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + sql("SELECT * FROM tmpView2 x JOIN tmpView2 y ON x.tdate = y.tdate").collect() + } + } + } + } + + test("SPARK-25988: self join with aliases on partitioned tables #2") { + withTempView("tmp") { + withTable("tab1", "tab2") { + sql( + """ + |CREATE TABLE `tab1` (`EX` STRING, `TDATE` DATE) + |USING parquet + |PARTITIONED BY (tdate) + """.stripMargin) + sql("CREATE TABLE `tab2` (`TDATE` DATE) USING parquet") + sql( + """ + |CREATE OR REPLACE TEMPORARY VIEW TMP as + |SELECT N.tdate, EX AS new_ex + |FROM tab1 N + |JOIN tab2 Z + |ON N.tdate = Z.tdate + """.stripMargin) + sql( + """ + |SELECT * FROM TMP x JOIN TMP y + |ON x.tdate = y.tdate + """.stripMargin).queryExecution.executedPlan + } + } + } } case class Foo(bar: Option[String]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org