Repository: spark Updated Branches: refs/heads/master f6cc354d8 -> 891032da6
[SPARK-25691][SQL] Use semantic equality in AliasViewChild in order to compare attributes ## What changes were proposed in this pull request? When we compare attributes, in general, we should always refer to semantic equality, as the default `equal` method can return false when there are "cosmetic" differences between them, but still they are the same thing; at least we have to consider them so when analyzing/optimizing queries. The PR focuses on the usage and comparison of the `output` of a `LogicalPlan`, which is a `Seq[Attribute]` in `AliasViewChild`. In this case, using equality implicitly fails to check the semantic equality. This results in the operator failing to stabilize. ## How was this patch tested? running the tests with the patch provided by maryannxue in https://github.com/apache/spark/pull/22060 Closes #22713 from mgaido91/SPARK-25691. Authored-by: Marco Gaido <marcogaid...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/891032da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/891032da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/891032da Branch: refs/heads/master Commit: 891032da6f5b3c6a690e2ae44396873aa6a6b91d Parents: f6cc354 Author: Marco Gaido <marcogaid...@gmail.com> Authored: Wed Oct 31 09:18:53 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Oct 31 09:18:53 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/view.scala | 8 +++---- .../sql/catalyst/optimizer/Optimizer.scala | 5 +--- .../catalyst/plans/logical/LogicalPlan.scala | 14 +++++++++++ .../sql/catalyst/analysis/AnalysisSuite.scala | 25 +++++++++++++++++++- 4 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index af74693..6134d54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.internal.SQLConf */ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case v @ View(desc, output, child) if child.resolved && output != child.output => + case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) => val resolver = conf.resolver val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { @@ -70,7 +70,7 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp } // Map the attributes in the query output to the attributes in the view output by index. val newOutput = output.zip(queryOutput).map { - case (attr, originAttr) if attr != originAttr => + case (attr, originAttr) if !attr.semanticEquals(originAttr) => // The dataType of the output attributes may be not the same with that of the view // output, so we should cast the attribute to the dataType of the view output attribute. // Will throw an AnalysisException if the cast can't perform or might truncate. @@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // The child should have the same output attributes with the View operator, so we simply // remove the View operator. - case View(_, output, child) => - assert(output == child.output, + case v @ View(_, output, child) => + assert(v.sameOutput(child), s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " + s"view output ${output.mkString("[", ",", "]")}") child http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index da8009d..95455ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -530,9 +530,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper * p2 is usually inserted by this rule and useless, p1 could prune the columns anyway. */ object ColumnPruning extends Rule[LogicalPlan] { - private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = - output1.size == output2.size && - output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform { // Prunes the unused columns from project list of Project/Aggregate/Expand @@ -607,7 +604,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case w: Window if w.windowExpressions.isEmpty => w.child // Eliminate no-op Projects - case p @ Project(_, child) if sameOutput(child.output, p.output) => child + case p @ Project(_, child) if child.sameOutput(p) => child // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 339fbb8..a520eba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -130,6 +130,20 @@ abstract class LogicalPlan * Returns the output ordering that this plan generates. */ def outputOrdering: Seq[SortOrder] = Nil + + /** + * Returns true iff `other`'s output is semantically the same, ie.: + * - it contains the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + */ + def sameOutput(other: LogicalPlan): Boolean = { + val thisOutput = this.output + val otherOutput = other.output + thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index d8cb6f7..da3ae72 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import java.util.TimeZone +import java.util.{Locale, TimeZone} import scala.reflect.ClassTag @@ -25,6 +25,7 @@ import org.scalatest.Matchers import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} +import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -604,4 +606,25 @@ class AnalysisSuite extends AnalysisTest with Matchers { checkAnalysis(input, expected) } } + + test("SPARK-25691: AliasViewChild with different nullabilities") { + object ViewAnalyzer extends RuleExecutor[LogicalPlan] { + val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) :: Nil + } + val relation = LocalRelation('a.int.notNull, 'b.string) + val view = View(CatalogTable( + identifier = TableIdentifier("v1"), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType)))), + output = Seq('a.int, 'b.string), + child = relation) + val tz = Option(conf.sessionLocalTimeZone) + val expected = Project(Seq( + Alias(Cast('a.int.notNull, IntegerType, tz), "a")(), + Alias(Cast('b.string, StringType, tz), "b")()), + relation) + val res = ViewAnalyzer.execute(view) + comparePlans(res, expected) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org