Repository: spark
Updated Branches:
  refs/heads/master abdb5d42c -> 0ec279ffd


[SPARK-15753][SQL] Move Analyzer stuff to Analyzer from DataFrameWriter

## What changes were proposed in this pull request?

This patch moves some codes in `DataFrameWriter.insertInto` that belongs to 
`Analyzer`.

## How was this patch tested?
Existing tests.

Author: Liang-Chi Hsieh <sim...@tw.ibm.com>

Closes #13496 from viirya/move-analyzer-stuff.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ec279ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ec279ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ec279ff

Branch: refs/heads/master
Commit: 0ec279ffdf92853965e327a9f0f6956cacb7a23e
Parents: abdb5d4
Author: Liang-Chi Hsieh <sim...@tw.ibm.com>
Authored: Fri Jun 10 11:05:04 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Fri Jun 10 11:05:04 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 17 ++++++++++++++---
 .../org/apache/spark/sql/DataFrameWriter.scala     | 12 +-----------
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |  4 ++--
 3 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d1ca99f..58f3904 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -452,6 +452,17 @@ class Analyzer(
 
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
       case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if 
child.resolved =>
+        // A partitioned relation's schema can be different from the input 
logicalPlan, since
+        // partition columns are all moved after data columns. We Project to 
adjust the ordering.
+        val input = if (parts.nonEmpty) {
+          val (inputPartCols, inputDataCols) = child.output.partition { attr =>
+            parts.contains(attr.name)
+          }
+          Project(inputDataCols ++ inputPartCols, child)
+        } else {
+          child
+        }
+
         val table = lookupTableFromCatalog(u)
         // adding the table's partitions or validate the query's partition info
         table match {
@@ -467,8 +478,8 @@ class Analyzer(
                      |Requested partitions: ${parts.keys.mkString(",")}
                      |Table partitions: 
${tablePartitionNames.mkString(",")}""".stripMargin)
               }
-              // Assume partition columns are correctly placed at the end of 
the child's output
-              i.copy(table = EliminateSubqueryAliases(table))
+              // Partition columns are already correctly placed at the end of 
the child's output
+              i.copy(table = EliminateSubqueryAliases(table), child = input)
             } else {
               // Set up the table's partition scheme with all dynamic 
partitions by moving partition
               // columns to the end of the column list, in partition order.
@@ -486,7 +497,7 @@ class Analyzer(
                 child = Project(columns ++ partColumns, child))
             }
           case _ =>
-            i.copy(table = EliminateSubqueryAliases(table))
+            i.copy(table = EliminateSubqueryAliases(table), child = input)
         }
       case u: UnresolvedRelation =>
         val table = u.tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 32e2fdc..6ce59e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -505,21 +505,11 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
     val partitions = normalizedParCols.map(_.map(col => col -> (None: 
Option[String])).toMap)
     val overwrite = mode == SaveMode.Overwrite
 
-    // A partitioned relation's schema can be different from the input 
logicalPlan, since
-    // partition columns are all moved after data columns. We Project to 
adjust the ordering.
-    // TODO: this belongs to the analyzer.
-    val input = normalizedParCols.map { parCols =>
-      val (inputPartCols, inputDataCols) = df.logicalPlan.output.partition { 
attr =>
-        parCols.contains(attr.name)
-      }
-      Project(inputDataCols ++ inputPartCols, df.logicalPlan)
-    }.getOrElse(df.logicalPlan)
-
     df.sparkSession.sessionState.executePlan(
       InsertIntoTable(
         UnresolvedRelation(tableIdent),
         partitions.getOrElse(Map.empty[String, Option[String]]),
-        input,
+        df.logicalPlan,
         overwrite,
         ifNotExists = false)).toRdd
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ec279ff/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e0f6ccf..0a2bab4 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1042,7 +1042,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
         .queryExecution.analyzed
     }
 
-    assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+    assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
       analyzedPlan.collect {
         case _: Project => ()
       }.size
@@ -1061,7 +1061,7 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
         .queryExecution.analyzed
     }
 
-    assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+    assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
       analyzedPlan.collect {
         case _: Project => ()
       }.size


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to