Repository: spark
Updated Branches:
  refs/heads/master ccdf21f56 -> 274f0efef


[SPARK-22252][SQL] FileFormatWriter should respect the input query schema

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/18064, we allowed `RunnableCommand` to 
have children in order to fix some UI issues. Then we made `InsertIntoXXX` 
commands take the input `query` as a child, when we do the actual writing, we 
just pass the physical plan to the writer(`FileFormatWriter.write`).

However this is problematic. In Spark SQL, optimizer and planner are allowed to 
change the schema names a little bit. e.g. `ColumnPruning` rule will remove 
no-op `Project`s, like `Project("A", Scan("a"))`, and thus change the output 
schema from "<A: int>" to `<a: int>`. When it comes to writing, especially for 
self-description data format like parquet, we may write the wrong schema to the 
file and cause null values at the read path.

Fortunately, in https://github.com/apache/spark/pull/18450 , we decided to 
allow nested execution and one query can map to multiple executions in the UI. 
This releases the major restriction in #18604 , and now we don't have to take 
the input `query` as child of `InsertIntoXXX` commands.

So the fix is simple, this PR partially revert #18064 and make `InsertIntoXXX` 
commands leaf nodes again.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenc...@databricks.com>

Closes #19474 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/274f0efe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/274f0efe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/274f0efe

Branch: refs/heads/master
Commit: 274f0efefa0c063649bccddb787e8863910f4366
Parents: ccdf21f
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Oct 12 20:20:44 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Oct 12 20:20:44 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  2 +-
 .../sql/catalyst/plans/logical/Command.scala    |  3 +-
 .../spark/sql/execution/QueryExecution.scala    |  4 +--
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../execution/columnar/InMemoryRelation.scala   |  3 +-
 .../columnar/InMemoryTableScanExec.scala        |  2 +-
 .../execution/command/DataWritingCommand.scala  | 13 +++++++++
 .../InsertIntoDataSourceDirCommand.scala        |  6 ++--
 .../spark/sql/execution/command/cache.scala     |  2 +-
 .../spark/sql/execution/command/commands.scala  | 30 ++++++--------------
 .../command/createDataSourceTables.scala        |  2 +-
 .../spark/sql/execution/command/views.scala     |  4 +--
 .../sql/execution/datasources/DataSource.scala  | 13 ++++++++-
 .../datasources/FileFormatWriter.scala          | 14 +++++----
 .../InsertIntoDataSourceCommand.scala           |  2 +-
 .../InsertIntoHadoopFsRelationCommand.scala     |  9 ++----
 .../datasources/SaveIntoDataSourceCommand.scala |  2 +-
 .../execution/streaming/FileStreamSink.scala    |  2 +-
 .../datasources/FileFormatWriterSuite.scala     | 16 ++++++++++-
 .../execution/InsertIntoHiveDirCommand.scala    | 10 ++-----
 .../hive/execution/InsertIntoHiveTable.scala    | 14 ++-------
 .../sql/hive/execution/SaveAsHiveFile.scala     |  6 ++--
 22 files changed, 85 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 7addbaa..c7952e3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -178,7 +178,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
     })
   }
 
-  override def innerChildren: Seq[QueryPlan[_]] = subqueries
+  override protected def innerChildren: Seq[QueryPlan[_]] = subqueries
 
   /**
    * Returns a plan where a best effort attempt has been made to transform 
`this` in a way

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
index ec5766e..38f4708 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
  * commands can be used by parsers to represent DDL operations.  Commands, 
unlike queries, are
  * eagerly executed.
  */
-trait Command extends LogicalPlan {
+trait Command extends LeafNode {
   override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 4accf54..f404621 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -119,7 +119,7 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
    * `SparkSQLDriver` for CLI applications.
    */
   def hiveResultString(): Seq[String] = executedPlan match {
-    case ExecutedCommandExec(desc: DescribeTableCommand, _) =>
+    case ExecutedCommandExec(desc: DescribeTableCommand) =>
       // If it is a describe command for a Hive table, we want to have the 
output format
       // be similar with Hive.
       desc.run(sparkSession).map {
@@ -130,7 +130,7 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
             .mkString("\t")
       }
     // SHOW TABLES in Hive only output table names, while ours output 
database, table name, isTemp.
-    case command @ ExecutedCommandExec(s: ShowTablesCommand, _) if 
!s.isExtended =>
+    case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended 
=>
       command.executeCollect().map(_.getString(1))
     case other =>
       val result: Seq[Seq[Any]] = 
other.executeCollectPublic().map(_.toSeq).toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4cdcc73..19b858f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -364,7 +364,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   // Can we automate these 'pass through' operations?
   object BasicOperators extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case r: RunnableCommand => ExecutedCommandExec(r, 
r.children.map(planLater)) :: Nil
+      case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
 
       case MemoryPlan(sink, output) =>
         val encoder = RowEncoder(sink.schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index bc98d8d..a1c62a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -62,7 +62,8 @@ case class InMemoryRelation(
     @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
     val batchStats: LongAccumulator = 
child.sqlContext.sparkContext.longAccumulator)
   extends logical.LeafNode with MultiInstanceRelation {
-  override def innerChildren: Seq[SparkPlan] = Seq(child)
+
+  override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
   override def producedAttributes: AttributeSet = outputSet
 

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index c7ddec5..af3636a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -34,7 +34,7 @@ case class InMemoryTableScanExec(
     @transient relation: InMemoryRelation)
   extends LeafExecNode {
 
-  override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ 
super.innerChildren
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ 
super.innerChildren
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 4e1c5e4..2cf0698 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.util.SerializableConfiguration
@@ -30,6 +31,18 @@ import org.apache.spark.util.SerializableConfiguration
  */
 trait DataWritingCommand extends RunnableCommand {
 
+  /**
+   * The input query plan that produces the data to be written.
+   */
+  def query: LogicalPlan
+
+  // We make the input `query` an inner child instead of a child in order to 
hide it from the
+  // optimizer. This is because optimizer may not preserve the output schema 
names' case, and we
+  // have to keep the original analyzed plan here so that we can pass the 
corrected schema to the
+  // writer. The schema of analyzed plan is what user expects(or specifies), 
so we should respect
+  // it when writing.
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
+
   override lazy val metrics: Map[String, SQLMetric] = {
     val sparkContext = SparkContext.getActive.get
     Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
index 633de4c..9e35190 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
@@ -21,7 +21,6 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources._
 
 /**
@@ -45,10 +44,9 @@ case class InsertIntoDataSourceDirCommand(
     query: LogicalPlan,
     overwrite: Boolean) extends RunnableCommand {
 
-  override def children: Seq[LogicalPlan] = Seq(query)
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
 
-  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
-    assert(children.length == 1)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     assert(storage.locationUri.nonEmpty, "Directory path is required")
     assert(provider.nonEmpty, "Data source is required")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 792290b..140f920 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -30,7 +30,7 @@ case class CacheTableCommand(
   require(plan.isEmpty || tableIdent.database.isEmpty,
     "Database name is not allowed in CACHE TABLE AS SELECT")
 
-  override def innerChildren: Seq[QueryPlan[_]] = plan.toSeq
+  override protected def innerChildren: Seq[QueryPlan[_]] = plan.toSeq
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     plan.foreach { logicalPlan =>

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 7cd4bae..e28b5eb2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
-import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.debug._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
@@ -37,19 +37,13 @@ import org.apache.spark.sql.types._
  * A logical command that is executed for its side-effects.  
`RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
-trait RunnableCommand extends logical.Command {
+trait RunnableCommand extends Command {
 
   // The map used to record the metrics of running the command. This will be 
passed to
   // `ExecutedCommand` during query planning.
   lazy val metrics: Map[String, SQLMetric] = Map.empty
 
-  def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
-    throw new NotImplementedError
-  }
-
-  def run(sparkSession: SparkSession): Seq[Row] = {
-    throw new NotImplementedError
-  }
+  def run(sparkSession: SparkSession): Seq[Row]
 }
 
 /**
@@ -57,9 +51,8 @@ trait RunnableCommand extends logical.Command {
  * saves the result to prevent multiple executions.
  *
  * @param cmd the `RunnableCommand` this operator will run.
- * @param children the children physical plans ran by the `RunnableCommand`.
  */
-case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) 
extends SparkPlan {
+case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
 
   override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
 
@@ -74,19 +67,14 @@ case class ExecutedCommandExec(cmd: RunnableCommand, 
children: Seq[SparkPlan]) e
    */
   protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
     val converter = CatalystTypeConverters.createToCatalystConverter(schema)
-    val rows = if (children.isEmpty) {
-      cmd.run(sqlContext.sparkSession)
-    } else {
-      cmd.run(sqlContext.sparkSession, children)
-    }
-    rows.map(converter(_).asInstanceOf[InternalRow])
+    
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
   }
 
-  override def innerChildren: Seq[QueryPlan[_]] = cmd.innerChildren
+  override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
 
   override def output: Seq[Attribute] = cmd.output
 
-  override def nodeName: String = cmd.nodeName
+  override def nodeName: String = "Execute " + cmd.nodeName
 
   override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
 

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 04b2534..9e39079 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -120,7 +120,7 @@ case class CreateDataSourceTableAsSelectCommand(
     query: LogicalPlan)
   extends RunnableCommand {
 
-  override def innerChildren: Seq[LogicalPlan] = Seq(query)
+  override protected def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     assert(table.tableType != CatalogTableType.VIEW)

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index ffdfd52..5172f32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -98,7 +98,7 @@ case class CreateViewCommand(
 
   import ViewHelper._
 
-  override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
 
   if (viewType == PersistedView) {
     require(originalText.isDefined, "'originalText' must be provided to create 
permanent view")
@@ -267,7 +267,7 @@ case class AlterViewAsCommand(
 
   import ViewHelper._
 
-  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(session: SparkSession): Seq[Row] = {
     // If the plan cannot be analyzed, throw an exception and don't proceed.

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b9502a9..b43d282 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -453,6 +453,17 @@ case class DataSource(
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
     PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, 
caseSensitive)
 
+
+    // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
+    // not need to have the query as child, to avoid to analyze an optimized 
query,
+    // because InsertIntoHadoopFsRelationCommand will be optimized first.
+    val partitionAttributes = partitionColumns.map { name =>
+      data.output.find(a => equality(a.name, name)).getOrElse {
+        throw new AnalysisException(
+          s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+      }
+    }
+
     val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
       sparkSession.table(tableIdent).queryExecution.analyzed.collect {
         case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
@@ -465,7 +476,7 @@ case class DataSource(
       outputPath = outputPath,
       staticPartitions = Map.empty,
       ifPartitionNotExists = false,
-      partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
+      partitionColumns = partitionAttributes,
       bucketSpec = bucketSpec,
       fileFormat = format,
       options = options,

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 5149697..75b1695 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -39,7 +39,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution}
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -101,7 +101,7 @@ object FileFormatWriter extends Logging {
    */
   def write(
       sparkSession: SparkSession,
-      plan: SparkPlan,
+      queryExecution: QueryExecution,
       fileFormat: FileFormat,
       committer: FileCommitProtocol,
       outputSpec: OutputSpec,
@@ -117,7 +117,9 @@ object FileFormatWriter extends Logging {
     job.setOutputValueClass(classOf[InternalRow])
     FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-    val allColumns = plan.output
+    // Pick the attributes from analyzed plan, as optimizer may not preserve 
the output schema
+    // names' case.
+    val allColumns = queryExecution.analyzed.output
     val partitionSet = AttributeSet(partitionColumns)
     val dataColumns = allColumns.filterNot(partitionSet.contains)
 
@@ -158,7 +160,7 @@ object FileFormatWriter extends Logging {
     // We should first sort by partition columns, then bucket id, and finally 
sorting columns.
     val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
     // the sort order doesn't matter
-    val actualOrdering = plan.outputOrdering.map(_.child)
+    val actualOrdering = 
queryExecution.executedPlan.outputOrdering.map(_.child)
     val orderingMatched = if (requiredOrdering.length > actualOrdering.length) 
{
       false
     } else {
@@ -176,12 +178,12 @@ object FileFormatWriter extends Logging {
 
     try {
       val rdd = if (orderingMatched) {
-        plan.execute()
+        queryExecution.toRdd
       } else {
         SortExec(
           requiredOrdering.map(SortOrder(_, Ascending)),
           global = false,
-          child = plan).execute()
+          child = queryExecution.executedPlan).execute()
       }
       val ret = new Array[WriteTaskResult](rdd.partitions.length)
       sparkSession.sparkContext.runJob(

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 08b2f4f..a813829 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -33,7 +33,7 @@ case class InsertIntoDataSourceCommand(
     overwrite: Boolean)
   extends RunnableCommand {
 
-  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 64e5a57..675bee8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogTable, CatalogT
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -57,11 +56,7 @@ case class InsertIntoHadoopFsRelationCommand(
   extends DataWritingCommand {
   import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 
-  override def children: Seq[LogicalPlan] = query :: Nil
-
-  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
-    assert(children.length == 1)
-
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow 
that
     SchemaUtils.checkSchemaColumnNameDuplication(
       query.schema,
@@ -144,7 +139,7 @@ case class InsertIntoHadoopFsRelationCommand(
       val updatedPartitionPaths =
         FileFormatWriter.write(
           sparkSession = sparkSession,
-          plan = children.head,
+          queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
           fileFormat = fileFormat,
           committer = committer,
           outputSpec = FileFormatWriter.OutputSpec(

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
index 5eb6a84..96c84ea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala
@@ -38,7 +38,7 @@ case class SaveIntoDataSourceCommand(
     options: Map[String, String],
     mode: SaveMode) extends RunnableCommand {
 
-  override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     dataSource.createRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 72e5ac4..6bd0696 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -121,7 +121,7 @@ class FileStreamSink(
 
       FileFormatWriter.write(
         sparkSession = sparkSession,
-        plan = data.queryExecution.executedPlan,
+        queryExecution = data.queryExecution,
         fileFormat = fileFormat,
         committer = committer,
         outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index a0c1ea6..6f8767d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
 
   test("empty file should be skipped while write to file") {
     withTempPath { path =>
@@ -30,4 +31,17 @@ class FileFormatWriterSuite extends QueryTest with 
SharedSQLContext {
       assert(partFiles.length === 2)
     }
   }
+
+  test("FileFormatWriter should respect the input query schema") {
+    withTable("t1", "t2", "t3", "t4") {
+      spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
+      spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")
+      checkAnswer(spark.table("t2"), Row(0, 0))
+
+      // Test picking part of the columns when writing.
+      spark.range(1).select('id, 'id as 'col1, 'id as 
'col2).write.saveAsTable("t3")
+      spark.sql("select COL1, COL2 from t3").write.saveAsTable("t4")
+      checkAnswer(spark.table("t4"), Row(0, 0))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 918c8be..1c6f8dd 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -27,11 +27,10 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred._
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.client.HiveClientImpl
 
 /**
@@ -57,10 +56,7 @@ case class InsertIntoHiveDirCommand(
     query: LogicalPlan,
     overwrite: Boolean) extends SaveAsHiveFile {
 
-  override def children: Seq[LogicalPlan] = query :: Nil
-
-  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
-    assert(children.length == 1)
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     assert(storage.locationUri.nonEmpty)
 
     val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
@@ -102,7 +98,7 @@ case class InsertIntoHiveDirCommand(
     try {
       saveAsHiveFile(
         sparkSession = sparkSession,
-        plan = children.head,
+        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
         hadoopConf = hadoopConf,
         fileSinkConf = fileSinkConf,
         outputLocation = tmpPath.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index e5b59ed..56e10bc 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,20 +17,16 @@
 
 package org.apache.spark.sql.hive.execution
 
-import scala.util.control.NonFatal
-
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.ErrorMsg
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveClientImpl
 
@@ -72,16 +68,12 @@ case class InsertIntoHiveTable(
     overwrite: Boolean,
     ifPartitionNotExists: Boolean) extends SaveAsHiveFile {
 
-  override def children: Seq[LogicalPlan] = query :: Nil
-
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly 
serialized with the
    * `org.apache.hadoop.hive.serde2.SerDe` and the
    * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
    */
-  override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): 
Seq[Row] = {
-    assert(children.length == 1)
-
+  override def run(sparkSession: SparkSession): Seq[Row] = {
     val externalCatalog = sparkSession.sharedState.externalCatalog
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
@@ -170,7 +162,7 @@ case class InsertIntoHiveTable(
 
     saveAsHiveFile(
       sparkSession = sparkSession,
-      plan = children.head,
+      queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
       hadoopConf = hadoopConf,
       fileSinkConf = fileSinkConf,
       outputLocation = tmpLocation.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/274f0efe/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 2d74ef0..6365759 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -47,7 +47,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand 
{
 
   protected def saveAsHiveFile(
       sparkSession: SparkSession,
-      plan: SparkPlan,
+      queryExecution: QueryExecution,
       hadoopConf: Configuration,
       fileSinkConf: FileSinkDesc,
       outputLocation: String,
@@ -75,7 +75,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand 
{
 
     FileFormatWriter.write(
       sparkSession = sparkSession,
-      plan = plan,
+      queryExecution = queryExecution,
       fileFormat = new HiveFileFormat(fileSinkConf),
       committer = committer,
       outputSpec = FileFormatWriter.OutputSpec(outputLocation, 
customPartitionLocations),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to