Repository: spark
Updated Branches:
  refs/heads/branch-2.3 31e46ec60 -> 9db81fd86


[SPARK-25313][BRANCH-2.3][SQL] Fix regression in FileFormatWriter output names

Port https://github.com/apache/spark/pull/22320 to branch-2.3
## What changes were proposed in this pull request?

Let's see the follow example:
```
        val location = "/tmp/t"
        val df = spark.range(10).toDF("id")
        df.write.format("parquet").saveAsTable("tbl")
        spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
        spark.sql(s"CREATE TABLE tbl2(ID long) USING parquet location 
$location")
        spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
        println(spark.read.parquet(location).schema)
        spark.table("tbl2").show()
```
The output column name in schema will be `id` instead of `ID`, thus the last 
query shows nothing from `tbl2`.
By enabling the debug message we can see that the output naming is changed from 
`ID` to `id`, and then the `outputColumns` in 
`InsertIntoHadoopFsRelationCommand` is changed in `RemoveRedundantAliases`.
![wechatimg5](https://user-images.githubusercontent.com/1097932/44947871-6299f200-ae46-11e8-9c96-d45fe368206c.jpeg)

![wechatimg4](https://user-images.githubusercontent.com/1097932/44947866-56ae3000-ae46-11e8-8923-8b3bbe060075.jpeg)

**To guarantee correctness**, we should change the output columns from 
`Seq[Attribute]` to `Seq[String]` to avoid its names being replaced by 
optimizer.

I will fix project elimination related rules in 
https://github.com/apache/spark/pull/22311 after this one.

## How was this patch tested?

Unit test.

Closes #22346 from gengliangwang/portSchemaOutputName2.3.

Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9db81fd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9db81fd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9db81fd8

Branch: refs/heads/branch-2.3
Commit: 9db81fd864dcc97bed2bf5bd2028787c3f07a6d0
Parents: 31e46ec
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Thu Sep 6 23:02:55 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Sep 6 23:02:55 2018 +0800

----------------------------------------------------------------------
 .../execution/command/DataWritingCommand.scala  | 43 +++++++++++-
 .../command/createDataSourceTables.scala        |  4 +-
 .../sql/execution/datasources/DataSource.scala  | 16 +++--
 .../datasources/DataSourceStrategy.scala        |  4 +-
 .../InsertIntoHadoopFsRelationCommand.scala     |  6 +-
 .../sql/test/DataFrameReaderWriterSuite.scala   | 74 ++++++++++++++++++++
 .../apache/spark/sql/hive/HiveStrategies.scala  |  6 +-
 .../CreateHiveTableAsSelectCommand.scala        |  9 +--
 .../execution/InsertIntoHiveDirCommand.scala    |  2 +-
 .../hive/execution/InsertIntoHiveTable.scala    |  2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 48 +++++++++++++
 11 files changed, 189 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index e11dbd2..0a185b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, 
LogicalPlan}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
 import org.apache.spark.sql.execution.datasources.FileFormatWriter
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -41,8 +42,12 @@ trait DataWritingCommand extends Command {
 
   override final def children: Seq[LogicalPlan] = query :: Nil
 
-  // Output columns of the analyzed input query plan
-  def outputColumns: Seq[Attribute]
+  // Output column names of the analyzed input query plan.
+  def outputColumnNames: Seq[String]
+
+  // Output columns of the analyzed input query plan.
+  def outputColumns: Seq[Attribute] =
+    DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames)
 
   lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
 
@@ -53,3 +58,35 @@ trait DataWritingCommand extends Command {
 
   def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
 }
+
+object DataWritingCommand {
+  /**
+   * Returns output attributes with provided names.
+   * The length of provided names should be the same of the length of 
[[LogicalPlan.output]].
+   */
+  def logicalPlanOutputWithNames(
+      query: LogicalPlan,
+      names: Seq[String]): Seq[Attribute] = {
+    // Save the output attributes to a variable to avoid duplicated function 
calls.
+    val outputAttributes = query.output
+    assert(outputAttributes.length == names.length,
+      "The length of provided names doesn't match the length of output 
attributes.")
+    outputAttributes.zip(names).map { case (attr, outputName) =>
+      attr.withName(outputName)
+    }
+  }
+
+  /**
+   * Returns schema of logical plan with provided names.
+   * The length of provided names should be the same of the length of 
[[LogicalPlan.schema]].
+   */
+  def logicalPlanSchemaWithNames(
+      query: LogicalPlan,
+      names: Seq[String]): StructType = {
+    assert(query.schema.length == names.length,
+      "The length of provided names doesn't match the length of query schema.")
+    StructType(query.schema.zip(names).map { case (structField, outputName) =>
+      structField.copy(name = outputName)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index e974776..93ac174 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -139,7 +139,7 @@ case class CreateDataSourceTableAsSelectCommand(
     table: CatalogTable,
     mode: SaveMode,
     query: LogicalPlan,
-    outputColumns: Seq[Attribute])
+    outputColumnNames: Seq[String])
   extends DataWritingCommand {
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
@@ -213,7 +213,7 @@ case class CreateDataSourceTableAsSelectCommand(
       catalogTable = if (tableExists) Some(table) else None)
 
     try {
-      dataSource.writeAndRead(mode, query, outputColumns, physicalPlan)
+      dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan)
     } catch {
       case ex: AnalysisException =>
         logError(s"Failed to write to table 
${table.identifier.unquotedString}", ex)

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6e1b572..f4aebef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -479,7 +480,7 @@ case class DataSource(
       mode = mode,
       catalogTable = catalogTable,
       fileIndex = fileIndex,
-      outputColumns = data.output)
+      outputColumnNames = data.output.map(_.name))
   }
 
   /**
@@ -489,9 +490,9 @@ case class DataSource(
    * @param mode The save mode for this writing.
    * @param data The input query plan that produces the data to be written. 
Note that this plan
    *             is analyzed and optimized.
-   * @param outputColumns The original output columns of the input query plan. 
The optimizer may not
-   *                      preserve the output column's names' case, so we need 
this parameter
-   *                      instead of `data.output`.
+   * @param outputColumnNames The original output column names of the input 
query plan. The
+   *                          optimizer may not preserve the output column's 
names' case, so we need
+   *                          this parameter instead of `data.output`.
    * @param physicalPlan The physical plan of the input query plan. We should 
run the writing
    *                     command with this physical plan instead of creating a 
new physical plan,
    *                     so that the metrics can be correctly linked to the 
given physical plan and
@@ -500,8 +501,9 @@ case class DataSource(
   def writeAndRead(
       mode: SaveMode,
       data: LogicalPlan,
-      outputColumns: Seq[Attribute],
+      outputColumnNames: Seq[String],
       physicalPlan: SparkPlan): BaseRelation = {
+    val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, 
outputColumnNames)
     if 
(outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into 
external storage.")
     }
@@ -524,7 +526,9 @@ case class DataSource(
               s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
           }
         }
-        val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
+        val resolved = cmd.copy(
+          partitionColumns = resolvedPartCols,
+          outputColumnNames = outputColumnNames)
         resolved.run(sparkSession, physicalPlan)
         // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring
         copy(userSpecifiedSchema = 
Some(outputColumns.toStructType.asNullable)).resolveRelation()

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 3f41612..74aa9a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
     case CreateTable(tableDesc, mode, Some(query))
         if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
       DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema))
-      CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, 
query.output)
+      CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, 
query.output.map(_.name))
 
     case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
         parts, query, overwrite, false) if parts.isEmpty =>
@@ -209,7 +209,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends 
Rule[LogicalPlan] with Cast
         mode,
         table,
         Some(t.location),
-        actualQuery.output)
+        actualQuery.output.map(_.name))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index dd7ef0d..3cc4a88 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -55,14 +55,14 @@ case class InsertIntoHadoopFsRelationCommand(
     mode: SaveMode,
     catalogTable: Option[CatalogTable],
     fileIndex: Option[FileIndex],
-    outputColumns: Seq[Attribute])
+    outputColumnNames: Seq[String])
   extends DataWritingCommand {
   import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     // Most formats don't do well with duplicate columns, so lets not allow 
that
-    SchemaUtils.checkSchemaColumnNameDuplication(
-      query.schema,
+    SchemaUtils.checkColumnNameDuplication(
+      outputColumnNames,
       s"when inserting into $outputPath",
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index b3147b0..5c06551 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -794,6 +794,80 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     }
   }
 
+  test("Insert overwrite table command should output correct schema: basic") {
+    withTable("tbl", "tbl2") {
+      withView("view1") {
+        val df = spark.range(10).toDF("id")
+        df.write.format("parquet").saveAsTable("tbl")
+        spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+        spark.sql("CREATE TABLE tbl2(ID long) USING parquet")
+        spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
+        val identifier = TableIdentifier("tbl2")
+        val location = 
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+        val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
+        assert(spark.read.parquet(location).schema == expectedSchema)
+        checkAnswer(spark.table("tbl2"), df)
+      }
+    }
+  }
+
+  test("Insert overwrite table command should output correct schema: complex") 
{
+    withTable("tbl", "tbl2") {
+      withView("view1") {
+        val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", 
"col2", "col3")
+        df.write.format("parquet").saveAsTable("tbl")
+        spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl")
+        spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING 
parquet PARTITIONED " +
+          "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS")
+        spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM 
view1")
+        val identifier = TableIdentifier("tbl2")
+        val location = 
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+        val expectedSchema = StructType(Seq(
+          StructField("COL1", LongType, true),
+          StructField("COL3", IntegerType, true),
+          StructField("COL2", IntegerType, true)))
+        assert(spark.read.parquet(location).schema == expectedSchema)
+        checkAnswer(spark.table("tbl2"), df)
+      }
+    }
+  }
+
+  test("Create table as select command should output correct schema: basic") {
+    withTable("tbl", "tbl2") {
+      withView("view1") {
+        val df = spark.range(10).toDF("id")
+        df.write.format("parquet").saveAsTable("tbl")
+        spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+        spark.sql("CREATE TABLE tbl2 USING parquet AS SELECT ID FROM view1")
+        val identifier = TableIdentifier("tbl2")
+        val location = 
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+        val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
+        assert(spark.read.parquet(location).schema == expectedSchema)
+        checkAnswer(spark.table("tbl2"), df)
+      }
+    }
+  }
+
+  test("Create table as select command should output correct schema: complex") 
{
+    withTable("tbl", "tbl2") {
+      withView("view1") {
+        val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", 
"col2", "col3")
+        df.write.format("parquet").saveAsTable("tbl")
+        spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl")
+        spark.sql("CREATE TABLE tbl2 USING parquet PARTITIONED BY (COL2) " +
+          "CLUSTERED BY (COL3) INTO 3 BUCKETS AS SELECT COL1, COL2, COL3 FROM 
view1")
+        val identifier = TableIdentifier("tbl2")
+        val location = 
spark.sessionState.catalog.getTableMetadata(identifier).location.toString
+        val expectedSchema = StructType(Seq(
+          StructField("COL1", LongType, true),
+          StructField("COL3", IntegerType, true),
+          StructField("COL2", IntegerType, true)))
+        assert(spark.read.parquet(location).schema == expectedSchema)
+        checkAnswer(spark.table("tbl2"), df)
+      }
+    }
+  }
+
   test("use Spark jobs to list files") {
     withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") {
       withTempDir { dir =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 8df05cb..fee6f00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -149,7 +149,7 @@ object HiveAnalysis extends Rule[LogicalPlan] {
     case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, 
ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
-        ifPartitionNotExists, query.output)
+        ifPartitionNotExists, query.output.map(_.name))
 
     case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) 
=>
       DDLUtils.checkDataColNames(tableDesc)
@@ -157,14 +157,14 @@ object HiveAnalysis extends Rule[LogicalPlan] {
 
     case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
       DDLUtils.checkDataColNames(tableDesc)
-      CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode)
+      CreateHiveTableAsSelectCommand(tableDesc, query, 
query.output.map(_.name), mode)
 
     case InsertIntoDir(isLocal, storage, provider, child, overwrite)
         if DDLUtils.isHiveTable(provider) =>
       val outputPath = new Path(storage.locationUri.get)
       if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
 
-      InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, 
child.output)
+      InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, 
child.output.map(_.name))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 1e801fe..1ff680f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -37,7 +37,7 @@ import 
org.apache.spark.sql.execution.command.DataWritingCommand
 case class CreateHiveTableAsSelectCommand(
     tableDesc: CatalogTable,
     query: LogicalPlan,
-    outputColumns: Seq[Attribute],
+    outputColumnNames: Seq[String],
     mode: SaveMode)
   extends DataWritingCommand {
 
@@ -63,13 +63,14 @@ case class CreateHiveTableAsSelectCommand(
         query,
         overwrite = false,
         ifPartitionNotExists = false,
-        outputColumns = outputColumns).run(sparkSession, child)
+        outputColumnNames = outputColumnNames).run(sparkSession, child)
     } else {
       // TODO ideally, we should get the output data ready first and then
       // add the relation into catalog, just in case of failure occurs while 
data
       // processing.
       assert(tableDesc.schema.isEmpty)
-      catalog.createTable(tableDesc.copy(schema = query.schema), 
ignoreIfExists = false)
+      val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, 
outputColumnNames)
+      catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = 
false)
 
       try {
         // Read back the metadata of the table which was created just now.
@@ -82,7 +83,7 @@ case class CreateHiveTableAsSelectCommand(
           query,
           overwrite = true,
           ifPartitionNotExists = false,
-          outputColumns = outputColumns).run(sparkSession, child)
+          outputColumnNames = outputColumnNames).run(sparkSession, child)
       } catch {
         case NonFatal(e) =>
           // drop the created table.

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index cebeca0..0a73aaa 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -57,7 +57,7 @@ case class InsertIntoHiveDirCommand(
     storage: CatalogStorageFormat,
     query: LogicalPlan,
     overwrite: Boolean,
-    outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
+    outputColumnNames: Seq[String]) extends SaveAsHiveFile {
 
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
     assert(storage.locationUri.nonEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 02a60f1..75a0563 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -69,7 +69,7 @@ case class InsertIntoHiveTable(
     query: LogicalPlan,
     overwrite: Boolean,
     ifPartitionNotExists: Boolean,
-    outputColumns: Seq[Attribute]) extends SaveAsHiveFile {
+    outputColumnNames: Seq[String]) extends SaveAsHiveFile {
 
   /**
    * Inserts all the rows in the table into Hive.  Row objects are properly 
serialized with the

http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index db76ec9..7eaac85 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -751,6 +751,54 @@ class HiveDDLSuite
     }
   }
 
+  test("Insert overwrite Hive table should output correct schema") {
+    withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
+      withTable("tbl", "tbl2") {
+        withView("view1") {
+          spark.sql("CREATE TABLE tbl(id long)")
+          spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
+          spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+          withTempPath { path =>
+            sql(
+              s"""
+                |CREATE TABLE tbl2(ID long) USING hive
+                |OPTIONS(fileFormat 'parquet')
+                |LOCATION '${path.toURI}'
+              """.stripMargin)
+            spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
+            val expectedSchema = StructType(Seq(StructField("ID", LongType, 
true)))
+            assert(spark.read.parquet(path.toString).schema == expectedSchema)
+            checkAnswer(spark.table("tbl2"), Seq(Row(4)))
+          }
+        }
+      }
+    }
+  }
+
+  test("Create Hive table as select should output correct schema") {
+    withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
+      withTable("tbl", "tbl2") {
+        withView("view1") {
+          spark.sql("CREATE TABLE tbl(id long)")
+          spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
+          spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+          withTempPath { path =>
+            sql(
+              s"""
+                |CREATE TABLE tbl2 USING hive
+                |OPTIONS(fileFormat 'parquet')
+                |LOCATION '${path.toURI}'
+                |AS SELECT ID FROM view1
+              """.stripMargin)
+            val expectedSchema = StructType(Seq(StructField("ID", LongType, 
true)))
+            assert(spark.read.parquet(path.toString).schema == expectedSchema)
+            checkAnswer(spark.table("tbl2"), Seq(Row(4)))
+          }
+        }
+      }
+    }
+  }
+
   test("alter table partition - storage information") {
     sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width 
INT)")
     sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to