Repository: spark
Updated Branches:
  refs/heads/master f6cc354d8 -> 891032da6


[SPARK-25691][SQL] Use semantic equality in AliasViewChild in order to compare 
attributes

## What changes were proposed in this pull request?

When we compare attributes, in general, we should always refer to semantic 
equality, as the default `equal` method can return false when there are 
"cosmetic" differences between them, but still they are the same thing; at 
least we have to consider them so when analyzing/optimizing queries.

The PR focuses on the usage and comparison of the `output` of a `LogicalPlan`, 
which is a `Seq[Attribute]` in `AliasViewChild`. In this case, using equality 
implicitly fails to check the semantic equality. This results in the operator 
failing to stabilize.

## How was this patch tested?

running the tests with the patch provided by maryannxue in 
https://github.com/apache/spark/pull/22060

Closes #22713 from mgaido91/SPARK-25691.

Authored-by: Marco Gaido <marcogaid...@gmail.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/891032da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/891032da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/891032da

Branch: refs/heads/master
Commit: 891032da6f5b3c6a690e2ae44396873aa6a6b91d
Parents: f6cc354
Author: Marco Gaido <marcogaid...@gmail.com>
Authored: Wed Oct 31 09:18:53 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Oct 31 09:18:53 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/view.scala      |  8 +++----
 .../sql/catalyst/optimizer/Optimizer.scala      |  5 +---
 .../catalyst/plans/logical/LogicalPlan.scala    | 14 +++++++++++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 25 +++++++++++++++++++-
 4 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index af74693..6134d54 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.internal.SQLConf
  */
 case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with 
CastSupport {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp 
{
-    case v @ View(desc, output, child) if child.resolved && output != 
child.output =>
+    case v @ View(desc, output, child) if child.resolved && 
!v.sameOutput(child) =>
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames
       val queryOutput = if (queryColumnNames.nonEmpty) {
@@ -70,7 +70,7 @@ case class AliasViewChild(conf: SQLConf) extends 
Rule[LogicalPlan] with CastSupp
       }
       // Map the attributes in the query output to the attributes in the view 
output by index.
       val newOutput = output.zip(queryOutput).map {
-        case (attr, originAttr) if attr != originAttr =>
+        case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
           // The dataType of the output attributes may be not the same with 
that of the view
           // output, so we should cast the attribute to the dataType of the 
view output attribute.
           // Will throw an AnalysisException if the cast can't perform or 
might truncate.
@@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // The child should have the same output attributes with the View 
operator, so we simply
     // remove the View operator.
-    case View(_, output, child) =>
-      assert(output == child.output,
+    case v @ View(_, output, child) =>
+      assert(v.sameOutput(child),
         s"The output of the child ${child.output.mkString("[", ",", "]")} is 
different from the " +
           s"view output ${output.mkString("[", ",", "]")}")
       child

http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index da8009d..95455ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -530,9 +530,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] 
with PredicateHelper
  * p2 is usually inserted by this rule and useless, p1 could prune the columns 
anyway.
  */
 object ColumnPruning extends Rule[LogicalPlan] {
-  private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): 
Boolean =
-    output1.size == output2.size &&
-      output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
 
   def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan 
transform {
     // Prunes the unused columns from project list of Project/Aggregate/Expand
@@ -607,7 +604,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
     case w: Window if w.windowExpressions.isEmpty => w.child
 
     // Eliminate no-op Projects
-    case p @ Project(_, child) if sameOutput(child.output, p.output) => child
+    case p @ Project(_, child) if child.sameOutput(p) => child
 
     // Can't prune the columns on LeafNode
     case p @ Project(_, _: LeafNode) => p

http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 339fbb8..a520eba 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -130,6 +130,20 @@ abstract class LogicalPlan
    * Returns the output ordering that this plan generates.
    */
   def outputOrdering: Seq[SortOrder] = Nil
+
+  /**
+   * Returns true iff `other`'s output is semantically the same, ie.:
+   *  - it contains the same number of `Attribute`s;
+   *  - references are the same;
+   *  - the order is equal too.
+   */
+  def sameOutput(other: LogicalPlan): Boolean = {
+    val thisOutput = this.output
+    val otherOutput = other.output
+    thisOutput.length == otherOutput.length && 
thisOutput.zip(otherOutput).forall {
+      case (a1, a2) => a1.semanticEquals(a2)
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/891032da/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index d8cb6f7..da3ae72 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import java.util.TimeZone
+import java.util.{Locale, TimeZone}
 
 import scala.reflect.ClassTag
 
@@ -25,6 +25,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning,
   RangePartitioning, RoundRobinPartitioning}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -604,4 +606,25 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       checkAnalysis(input, expected)
     }
   }
+
+  test("SPARK-25691: AliasViewChild with different nullabilities") {
+    object ViewAnalyzer extends RuleExecutor[LogicalPlan] {
+      val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) 
:: Nil
+    }
+    val relation = LocalRelation('a.int.notNull, 'b.string)
+    val view = View(CatalogTable(
+        identifier = TableIdentifier("v1"),
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = StructType(Seq(StructField("a", IntegerType), 
StructField("b", StringType)))),
+      output = Seq('a.int, 'b.string),
+      child = relation)
+    val tz = Option(conf.sessionLocalTimeZone)
+    val expected = Project(Seq(
+        Alias(Cast('a.int.notNull, IntegerType, tz), "a")(),
+        Alias(Cast('b.string, StringType, tz), "b")()),
+      relation)
+    val res = ViewAnalyzer.execute(view)
+    comparePlans(res, expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to