[ 
https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723050#comment-16723050
 ] 

ASF GitHub Bot commented on SPARK-26352:
----------------------------------------

cloud-fan closed pull request #23333: [SPARK-26352][SQL][FOLLOWUP-2.3] Fix 
missing sameOutput in branch-2.3
URL: https://github.com/apache/spark/pull/23333
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index fedef68bf8513..503e20490a92c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -99,7 +99,7 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
         createOrderedJoin(input, conditions)
       }
 
-      if (p.sameOutput(reordered)) {
+      if (sameOutput(p, reordered)) {
         reordered
       } else {
         // Reordering the joins have changed the order of the columns.
@@ -107,6 +107,21 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
         Project(p.output, reordered)
       }
   }
+
+  /**
+   * Returns true iff output of both plans are semantically the same, ie.:
+   *  - they contain the same number of `Attribute`s;
+   *  - references are the same;
+   *  - the order is equal too.
+   * NOTE: this is copied over from SPARK-25691 from master.
+   */
+  def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
+    val output1 = plan1.output
+    val output2 = plan2.output
+    output1.length == output2.length && output1.zip(output2).forall {
+      case (a1, a2) => a1.semanticEquals(a2)
+    }
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index c8a4b6da4fcd0..9526cbca77094 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -300,8 +300,8 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
     val optimized = Optimize.execute(analyzed)
     val expected = groundTruthBestPlan.analyze
 
-    assert(analyzed.sameOutput(expected)) // if this fails, the expected plan 
itself is incorrect
-    assert(analyzed.sameOutput(optimized))
+    assert(sameOutput(analyzed, expected)) // if this fails, the expected plan 
itself is incorrect
+    assert(sameOutput(analyzed, optimized))
 
     compareJoinOrder(optimized, expected)
   }
@@ -309,4 +309,19 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
   private def outputsOf(plans: LogicalPlan*): Seq[Attribute] = {
     plans.map(_.output).reduce(_ ++ _)
   }
+
+  /**
+   * Returns true iff output of both plans are semantically the same, ie.:
+   *  - they contain the same number of `Attribute`s;
+   *  - references are the same;
+   *  - the order is equal too.
+   * NOTE: this is copied over from SPARK-25691 from master.
+   */
+  def sameOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
+    val output1 = plan1.output
+    val output2 = plan2.output
+    output1.length == output2.length && output1.zip(output2).forall {
+      case (a1, a2) => a1.semanticEquals(a2)
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> join reordering should not change the order of output attributes
> ----------------------------------------------------------------
>
>                 Key: SPARK-26352
>                 URL: https://issues.apache.org/jira/browse/SPARK-26352
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Kris Mok
>            Assignee: Kris Mok
>            Priority: Major
>             Fix For: 2.3.3, 2.4.1, 3.0.0
>
>
> The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} 
> performs join reordering on inner joins. This was introduced from SPARK-12032 
> in 2015-12.
> After it had reordered the joins, though, it didn't check whether or not the 
> column order (in terms of the {{output}} attribute list) is still the same as 
> before. Thus, it's possible to have a mismatch between the reordered column 
> order vs the schema that a DataFrame thinks it has.
> This can be demonstrated with the example:
> {code:none}
> spark.sql("create table table_a (x int, y int) using parquet")
> spark.sql("create table table_b (i int, j int) using parquet")
> spark.sql("create table table_c (a int, b int) using parquet")
> val df = spark.sql("with df1 as (select * from table_a cross join table_b) 
> select * from df1 join table_c on a = x and b = i")
> {code}
> here's what the DataFrame thinks:
> {code:none}
> scala> df.printSchema
> root
>  |-- x: integer (nullable = true)
>  |-- y: integer (nullable = true)
>  |-- i: integer (nullable = true)
>  |-- j: integer (nullable = true)
>  |-- a: integer (nullable = true)
>  |-- b: integer (nullable = true)
> {code}
> here's what the optimized plan thinks, after join reordering:
> {code:none}
> scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- 
> ${a.name}: ${a.dataType.typeName}"))
> |-- x: integer
> |-- y: integer
> |-- a: integer
> |-- b: integer
> |-- i: integer
> |-- j: integer
> {code}
> If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule 
> exclusion feature), it's back to normal:
> {code:none}
> scala> spark.conf.set("spark.sql.optimizer.excludedRules", 
> "org.apache.spark.sql.catalyst.optimizer.ReorderJoin")
> scala> val df = spark.sql("with df1 as (select * from table_a cross join 
> table_b) select * from df1 join table_c on a = x and b = i")
> df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields]
> scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- 
> ${a.name}: ${a.dataType.typeName}"))
> |-- x: integer
> |-- y: integer
> |-- i: integer
> |-- j: integer
> |-- a: integer
> |-- b: integer
> {code}
> Note that this column ordering problem leads to data corruption, and can 
> manifest itself in various symptoms:
> * Silently corrupting data, if the reordered columns happen to either have 
> matching types or have sufficiently-compatible types (e.g. all fixed length 
> primitive types are considered as "sufficiently compatible" in an UnsafeRow), 
> then only the resulting data is going to be wrong but it might not trigger 
> any alarms immediately. Or
> * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, 
> or even SIGSEGVs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to