Repository: spark
Updated Branches:
  refs/heads/master 67ea11ea0 -> d4f0b1d2c


[SPARK-22834][SQL] Make insertion commands have real children to fix UI issues

## What changes were proposed in this pull request?

With #19474,  children of insertion commands are missing in UI.
To fix it:
1. Create a new physical plan `DataWritingCommandExec` to exec 
`DataWritingCommand` with children.  So that the other commands won't be 
affected.
2. On creation of `DataWritingCommand`, a new field `allColumns` must be 
specified, which is the output of analyzed plan.
3. In `FileFormatWriter`, the output schema will use `allColumns` instead of 
the output of optimized plan.

Before code changes:
![2017-12-19 10 27 
10](https://user-images.githubusercontent.com/1097932/34161850-d2fd0acc-e50c-11e7-898a-177154fe7d8e.png)

After code changes:
![2017-12-19 10 27 
04](https://user-images.githubusercontent.com/1097932/34161865-de23de26-e50c-11e7-9131-0c32f7b7b749.png)

## How was this patch tested?
Unit test

Author: Wang Gengliang <ltn...@gmail.com>

Closes #20020 from gengliangwang/insert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4f0b1d2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4f0b1d2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4f0b1d2

Branch: refs/heads/master
Commit: d4f0b1d2c546fd0a3141dfb31aeccad1b9788bf5
Parents: 67ea11e
Author: Wang Gengliang <ltn...@gmail.com>
Authored: Fri Dec 29 15:28:33 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Dec 29 15:28:33 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +--
 .../spark/sql/execution/SparkStrategies.scala   |  1 +
 .../execution/command/DataWritingCommand.scala  | 28 ++++++++-------
 .../spark/sql/execution/command/commands.scala  | 38 +++++++++++++++++++-
 .../sql/execution/datasources/DataSource.scala  | 16 ++-------
 .../datasources/DataSourceStrategy.scala        |  3 +-
 .../datasources/FileFormatWriter.scala          | 24 ++++++-------
 .../InsertIntoHadoopFsRelationCommand.scala     | 10 +++---
 .../execution/streaming/FileStreamSink.scala    |  5 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  5 +--
 .../execution/InsertIntoHiveDirCommand.scala    | 14 +++++---
 .../hive/execution/InsertIntoHiveTable.scala    | 16 +++++----
 .../sql/hive/execution/SaveAsHiveFile.scala     | 10 +++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++++++
 14 files changed, 123 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index bd216ff..3304f36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{AnalysisBarrier, 
InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
@@ -264,7 +264,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
           sparkSession = df.sparkSession,
           className = source,
           partitionColumns = partitioningColumns.getOrElse(Nil),
-          options = extraOptions.toMap).planForWriting(mode, df.logicalPlan)
+          options = extraOptions.toMap).planForWriting(mode, 
AnalysisBarrier(df.logicalPlan))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 8c6c324..6b3f301 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -403,6 +403,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   // Can we automate these 'pass through' operations?
   object BasicOperators extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case d: DataWritingCommand => DataWritingCommandExec(d, 
planLater(d.query)) :: Nil
       case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
 
       case MemoryPlan(sink, output) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 2cf0698..e56f810 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.util.SerializableConfiguration
 
-
 /**
- * A special `RunnableCommand` which writes data out and updates metrics.
+ * A special `Command` which writes data out and updates metrics.
  */
-trait DataWritingCommand extends RunnableCommand {
-
+trait DataWritingCommand extends Command {
   /**
    * The input query plan that produces the data to be written.
+   * IMPORTANT: the input query plan MUST be analyzed, so that we can carry 
its output columns
+   *            to [[FileFormatWriter]].
    */
   def query: LogicalPlan
 
-  // We make the input `query` an inner child instead of a child in order to 
hide it from the
-  // optimizer. This is because optimizer may not preserve the output schema 
names' case, and we
-  // have to keep the original analyzed plan here so that we can pass the 
corrected schema to the
-  // writer. The schema of analyzed plan is what user expects(or specifies), 
so we should respect
-  // it when writing.
-  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
+  override final def children: Seq[LogicalPlan] = query :: Nil
 
-  override lazy val metrics: Map[String, SQLMetric] = {
+  // Output columns of the analyzed input query plan
+  def outputColumns: Seq[Attribute]
+
+  lazy val metrics: Map[String, SQLMetric] = {
     val sparkContext = SparkContext.getActive.get
     Map(
       "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written 
files"),
@@ -57,4 +59,6 @@ trait DataWritingCommand extends RunnableCommand {
     val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
     new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
   }
+
+  def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index e28b5eb2..2cc0e38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
-import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
 import org.apache.spark.sql.execution.debug._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
@@ -88,6 +88,42 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends 
LeafExecNode {
 }
 
 /**
+ * A physical operator that executes the run method of a `DataWritingCommand` 
and
+ * saves the result to prevent multiple executions.
+ *
+ * @param cmd the `DataWritingCommand` this operator will run.
+ * @param child the physical plan child ran by the `DataWritingCommand`.
+ */
+case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
+  extends SparkPlan {
+
+  override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
+
+  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+    val rows = cmd.run(sqlContext.sparkSession, child)
+
+    rows.map(converter(_).asInstanceOf[InternalRow])
+  }
+
+  override def children: Seq[SparkPlan] = child :: Nil
+
+  override def output: Seq[Attribute] = cmd.output
+
+  override def nodeName: String = "Execute " + cmd.nodeName
+
+  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
+
+  override def executeToIterator: Iterator[InternalRow] = 
sideEffectResult.toIterator
+
+  override def executeTake(limit: Int): Array[InternalRow] = 
sideEffectResult.take(limit).toArray
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+  }
+}
+
+/**
  * An explain command for users to see how a command will be executed.
  *
  * Note that this command takes in a logical plan, runs the optimizer on the 
logical plan

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b676672..25e1210 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -456,17 +456,6 @@ case class DataSource(
     val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
     PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, 
caseSensitive)
 
-
-    // SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
-    // not need to have the query as child, to avoid to analyze an optimized 
query,
-    // because InsertIntoHadoopFsRelationCommand will be optimized first.
-    val partitionAttributes = partitionColumns.map { name =>
-      data.output.find(a => equality(a.name, name)).getOrElse {
-        throw new AnalysisException(
-          s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
-      }
-    }
-
     val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
       sparkSession.table(tableIdent).queryExecution.analyzed.collect {
         case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location
@@ -479,14 +468,15 @@ case class DataSource(
       outputPath = outputPath,
       staticPartitions = Map.empty,
       ifPartitionNotExists = false,
-      partitionColumns = partitionAttributes,
+      partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
       bucketSpec = bucketSpec,
       fileFormat = format,
       options = options,
       query = data,
       mode = mode,
       catalogTable = catalogTable,
-      fileIndex = fileIndex)
+      fileIndex = fileIndex,
+      outputColumns = data.output)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 400f2e0..d94c5bb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -208,7 +208,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
         actualQuery,
         mode,
         table,
-        Some(t.location))
+        Some(t.location),
+        actualQuery.output)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 1fac01a..1d80a69 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -39,7 +39,7 @@ import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.{QueryExecution, SortExec, SQLExecution}
+import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -56,7 +56,9 @@ object FileFormatWriter extends Logging {
 
   /** Describes how output files should be placed in the filesystem. */
   case class OutputSpec(
-    outputPath: String, customPartitionLocations: Map[TablePartitionSpec, 
String])
+    outputPath: String,
+    customPartitionLocations: Map[TablePartitionSpec, String],
+    outputColumns: Seq[Attribute])
 
   /** A shared job description for all the write tasks. */
   private class WriteJobDescription(
@@ -101,7 +103,7 @@ object FileFormatWriter extends Logging {
    */
   def write(
       sparkSession: SparkSession,
-      queryExecution: QueryExecution,
+      plan: SparkPlan,
       fileFormat: FileFormat,
       committer: FileCommitProtocol,
       outputSpec: OutputSpec,
@@ -117,11 +119,8 @@ object FileFormatWriter extends Logging {
     job.setOutputValueClass(classOf[InternalRow])
     FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
-    // Pick the attributes from analyzed plan, as optimizer may not preserve 
the output schema
-    // names' case.
-    val allColumns = queryExecution.analyzed.output
     val partitionSet = AttributeSet(partitionColumns)
-    val dataColumns = allColumns.filterNot(partitionSet.contains)
+    val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
 
     val bucketIdExpression = bucketSpec.map { spec =>
       val bucketColumns = spec.bucketColumnNames.map(c => 
dataColumns.find(_.name == c).get)
@@ -144,7 +143,7 @@ object FileFormatWriter extends Logging {
       uuid = UUID.randomUUID().toString,
       serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
-      allColumns = allColumns,
+      allColumns = outputSpec.outputColumns,
       dataColumns = dataColumns,
       partitionColumns = partitionColumns,
       bucketIdExpression = bucketIdExpression,
@@ -160,7 +159,7 @@ object FileFormatWriter extends Logging {
     // We should first sort by partition columns, then bucket id, and finally 
sorting columns.
     val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
     // the sort order doesn't matter
-    val actualOrdering = 
queryExecution.executedPlan.outputOrdering.map(_.child)
+    val actualOrdering = plan.outputOrdering.map(_.child)
     val orderingMatched = if (requiredOrdering.length > actualOrdering.length) 
{
       false
     } else {
@@ -178,17 +177,18 @@ object FileFormatWriter extends Logging {
 
     try {
       val rdd = if (orderingMatched) {
-        queryExecution.toRdd
+        plan.execute()
       } else {
         // SPARK-21165: the `requiredOrdering` is based on the attributes from 
analyzed plan, and
         // the physical plan may have different attribute ids due to optimizer 
removing some
         // aliases. Here we bind the expression ahead to avoid potential 
attribute ids mismatch.
         val orderingExpr = requiredOrdering
-          .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, 
allColumns))
+          .map(SortOrder(_, Ascending))
+          .map(BindReferences.bindReference(_, outputSpec.outputColumns))
         SortExec(
           orderingExpr,
           global = false,
-          child = queryExecution.executedPlan).execute()
+          child = plan).execute()
       }
       val ret = new Array[WriteTaskResult](rdd.partitions.length)
       sparkSession.sparkContext.runJob(

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 675bee8..ad24e28 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogTable, CatalogT
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -52,11 +53,12 @@ case class InsertIntoHadoopFsRelationCommand(
     query: LogicalPlan,
     mode: SaveMode,
     catalogTable: Option[CatalogTable],
-    fileIndex: Option[FileIndex])
+    fileIndex: Option[FileIndex],
+    outputColumns: Seq[Attribute])
   extends DataWritingCommand {
   import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow 
that
     SchemaUtils.checkSchemaColumnNameDuplication(
       query.schema,
@@ -139,11 +141,11 @@ case class InsertIntoHadoopFsRelationCommand(
       val updatedPartitionPaths =
         FileFormatWriter.write(
           sparkSession = sparkSession,
-          queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
+          plan = child,
           fileFormat = fileFormat,
           committer = committer,
           outputSpec = FileFormatWriter.OutputSpec(
-            qualifiedOutputPath.toString, customPartitionLocations),
+            qualifiedOutputPath.toString, customPartitionLocations, 
outputColumns),
           hadoopConf = hadoopConf,
           partitionColumns = partitionColumns,
           bucketSpec = bucketSpec,

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 6bd0696..2715fa9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -118,13 +118,14 @@ class FileStreamSink(
           throw new RuntimeException(s"Partition column $col not found in 
schema ${data.schema}")
         }
       }
+      val qe = data.queryExecution
 
       FileFormatWriter.write(
         sparkSession = sparkSession,
-        queryExecution = data.queryExecution,
+        plan = qe.executedPlan,
         fileFormat = fileFormat,
         committer = committer,
-        outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
+        outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, 
qe.analyzed.output),
         hadoopConf = hadoopConf,
         partitionColumns = partitionColumns,
         bucketSpec = None,

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index a7961c7..ab857b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -148,7 +148,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, 
ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
-      InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, 
ifPartitionNotExists)
+      InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
+        ifPartitionNotExists, query.output)
 
     case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) 
=>
       DDLUtils.checkDataColNames(tableDesc)
@@ -163,7 +164,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
       val outputPath = new Path(storage.locationUri.get)
       if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
 
-      InsertIntoHiveDirCommand(isLocal, storage, child, overwrite)
+      InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, 
child.output)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 1c6f8dd..cebeca0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -27,10 +27,12 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred._
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.client.HiveClientImpl
 
 /**
@@ -54,9 +56,10 @@ case class InsertIntoHiveDirCommand(
     isLocal: Boolean,
     storage: CatalogStorageFormat,
     query: LogicalPlan,
-    overwrite: Boolean) extends SaveAsHiveFile {
+    overwrite: Boolean,
+    outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     assert(storage.locationUri.nonEmpty)
 
     val hiveTable = HiveClientImpl.toHiveTable(CatalogTable(
@@ -98,10 +101,11 @@ case class InsertIntoHiveDirCommand(
     try {
       saveAsHiveFile(
         sparkSession = sparkSession,
-        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
+        plan = child,
         hadoopConf = hadoopConf,
         fileSinkConf = fileSinkConf,
-        outputLocation = tmpPath.toString)
+        outputLocation = tmpPath.toString,
+        allColumns = outputColumns)
 
       val fs = writeToPath.getFileSystem(hadoopConf)
       if (overwrite && fs.exists(writeToPath)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b46addb..3ce5b84 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -23,10 +23,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveClientImpl
@@ -67,14 +68,15 @@ case class InsertIntoHiveTable(
     partition: Map[String, Option[String]],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean) extends SaveAsHiveFile {
+    ifPartitionNotExists: Boolean,
+    outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
 
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly 
serialized with the
    * `org.apache.hadoop.hive.serde2.SerDe` and the
    * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
    */
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     val externalCatalog = sparkSession.sharedState.externalCatalog
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
 
@@ -94,7 +96,7 @@ case class InsertIntoHiveTable(
     val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, 
tableLocation)
 
     try {
-      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, 
tmpLocation)
+      processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, 
tmpLocation, child)
     } finally {
       // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
       // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
@@ -119,7 +121,8 @@ case class InsertIntoHiveTable(
       externalCatalog: ExternalCatalog,
       hadoopConf: Configuration,
       tableDesc: TableDesc,
-      tmpLocation: Path): Unit = {
+      tmpLocation: Path,
+      child: SparkPlan): Unit = {
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
 
     val numDynamicPartitions = partition.values.count(_.isEmpty)
@@ -191,10 +194,11 @@ case class InsertIntoHiveTable(
 
     saveAsHiveFile(
       sparkSession = sparkSession,
-      queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
+      plan = child,
       hadoopConf = hadoopConf,
       fileSinkConf = fileSinkConf,
       outputLocation = tmpLocation.toString,
+      allColumns = outputColumns,
       partitionAttributes = partitionAttributes)
 
     if (partition.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index 6365759..9a6607f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
 import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -47,10 +47,11 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 
   protected def saveAsHiveFile(
       sparkSession: SparkSession,
-      queryExecution: QueryExecution,
+      plan: SparkPlan,
       hadoopConf: Configuration,
       fileSinkConf: FileSinkDesc,
       outputLocation: String,
+      allColumns: Seq[Attribute],
       customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
       partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
@@ -75,10 +76,11 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 
     FileFormatWriter.write(
       sparkSession = sparkSession,
-      queryExecution = queryExecution,
+      plan = plan,
       fileFormat = new HiveFileFormat(fileSinkConf),
       committer = committer,
-      outputSpec = FileFormatWriter.OutputSpec(outputLocation, 
customPartitionLocations),
+      outputSpec =
+        FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, 
allColumns),
       hadoopConf = hadoopConf,
       partitionColumns = partitionAttributes,
       bucketSpec = None,

http://git-wip-us.apache.org/repos/asf/spark/blob/d4f0b1d2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6c11905..f2e0c69 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2150,4 +2150,17 @@ class HiveDDLSuite
       assert(e.message.contains("LOAD DATA input path does not exist"))
     }
   }
+
+  test("SPARK-22252: FileFormatWriter should respect the input query schema in 
HIVE") {
+    withTable("t1", "t2", "t3", "t4") {
+      spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
+      spark.sql("select COL1, COL2 from 
t1").write.format("hive").saveAsTable("t2")
+      checkAnswer(spark.table("t2"), Row(0, 0))
+
+      // Test picking part of the columns when writing.
+      spark.range(1).select('id, 'id as 'col1, 'id as 
'col2).write.saveAsTable("t3")
+      spark.sql("select COL1, COL2 from 
t3").write.format("hive").saveAsTable("t4")
+      checkAnswer(spark.table("t4"), Row(0, 0))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to