Repository: spark
Updated Branches:
  refs/heads/master 3ff766f61 -> ec122209f


[SPARK-21165][SQL] FileFormatWriter should handle mismatched attribute ids 
between logical and physical plan

## What changes were proposed in this pull request?

Due to optimizer removing some unnecessary aliases, the logical and physical 
plan may have different output attribute ids. FileFormatWriter should handle 
this when creating the physical sort node.

## How was this patch tested?

new regression test.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19483 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec122209
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec122209
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec122209

Branch: refs/heads/master
Commit: ec122209fb35a65637df42eded64b0203e105aae
Parents: 3ff766f
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Oct 13 13:09:35 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Oct 13 13:09:35 2017 +0800

----------------------------------------------------------------------
 .../datasources/FileFormatWriter.scala          |  7 ++++++-
 .../datasources/FileFormatWriterSuite.scala     |  2 +-
 .../org/apache/spark/sql/hive/InsertSuite.scala | 22 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec122209/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 75b1695..1fac01a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -180,8 +180,13 @@ object FileFormatWriter extends Logging {
       val rdd = if (orderingMatched) {
         queryExecution.toRdd
       } else {
+        // SPARK-21165: the `requiredOrdering` is based on the attributes from 
analyzed plan, and
+        // the physical plan may have different attribute ids due to optimizer 
removing some
+        // aliases. Here we bind the expression ahead to avoid potential 
attribute ids mismatch.
+        val orderingExpr = requiredOrdering
+          .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, 
allColumns))
         SortExec(
-          requiredOrdering.map(SortOrder(_, Ascending)),
+          orderingExpr,
           global = false,
           child = queryExecution.executedPlan).execute()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec122209/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index 6f8767d..13f0e0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -32,7 +32,7 @@ class FileFormatWriterSuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
-  test("FileFormatWriter should respect the input query schema") {
+  test("SPARK-22252: FileFormatWriter should respect the input query schema") {
     withTable("t1", "t2", "t3", "t4") {
       spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
       spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")

http://git-wip-us.apache.org/repos/asf/spark/blob/ec122209/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index aa5cae3..ab91727 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -728,4 +728,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton 
with BeforeAndAfter
       assert(e.contains("mismatched input 'ROW'"))
     }
   }
+
+  test("SPARK-21165: FileFormatWriter should only rely on attributes from 
analyzed plan") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+      withTable("tab1", "tab2") {
+        Seq(("a", "b", 3)).toDF("word", "first", 
"length").write.saveAsTable("tab1")
+
+        spark.sql(
+          """
+            |CREATE TABLE tab2 (word string, length int)
+            |PARTITIONED BY (first string)
+          """.stripMargin)
+
+        spark.sql(
+          """
+            |INSERT INTO TABLE tab2 PARTITION(first)
+            |SELECT word, length, cast(first as string) as first FROM tab1
+          """.stripMargin)
+
+        checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to