Repository: spark Updated Branches: refs/heads/branch-2.3 31e46ec60 -> 9db81fd86
[SPARK-25313][BRANCH-2.3][SQL] Fix regression in FileFormatWriter output names Port https://github.com/apache/spark/pull/22320 to branch-2.3 ## What changes were proposed in this pull request? Let's see the follow example: ``` val location = "/tmp/t" val df = spark.range(10).toDF("id") df.write.format("parquet").saveAsTable("tbl") spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") spark.sql(s"CREATE TABLE tbl2(ID long) USING parquet location $location") spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") println(spark.read.parquet(location).schema) spark.table("tbl2").show() ``` The output column name in schema will be `id` instead of `ID`, thus the last query shows nothing from `tbl2`. By enabling the debug message we can see that the output naming is changed from `ID` to `id`, and then the `outputColumns` in `InsertIntoHadoopFsRelationCommand` is changed in `RemoveRedundantAliases`. ![wechatimg5](https://user-images.githubusercontent.com/1097932/44947871-6299f200-ae46-11e8-9c96-d45fe368206c.jpeg) ![wechatimg4](https://user-images.githubusercontent.com/1097932/44947866-56ae3000-ae46-11e8-8923-8b3bbe060075.jpeg) **To guarantee correctness**, we should change the output columns from `Seq[Attribute]` to `Seq[String]` to avoid its names being replaced by optimizer. I will fix project elimination related rules in https://github.com/apache/spark/pull/22311 after this one. ## How was this patch tested? Unit test. Closes #22346 from gengliangwang/portSchemaOutputName2.3. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9db81fd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9db81fd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9db81fd8 Branch: refs/heads/branch-2.3 Commit: 9db81fd864dcc97bed2bf5bd2028787c3f07a6d0 Parents: 31e46ec Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Thu Sep 6 23:02:55 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Sep 6 23:02:55 2018 +0800 ---------------------------------------------------------------------- .../execution/command/DataWritingCommand.scala | 43 +++++++++++- .../command/createDataSourceTables.scala | 4 +- .../sql/execution/datasources/DataSource.scala | 16 +++-- .../datasources/DataSourceStrategy.scala | 4 +- .../InsertIntoHadoopFsRelationCommand.scala | 6 +- .../sql/test/DataFrameReaderWriterSuite.scala | 74 ++++++++++++++++++++ .../apache/spark/sql/hive/HiveStrategies.scala | 6 +- .../CreateHiveTableAsSelectCommand.scala | 9 +-- .../execution/InsertIntoHiveDirCommand.scala | 2 +- .../hive/execution/InsertIntoHiveTable.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 48 +++++++++++++ 11 files changed, 189 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index e11dbd2..0a185b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.FileFormatWriter -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration /** @@ -41,8 +42,12 @@ trait DataWritingCommand extends Command { override final def children: Seq[LogicalPlan] = query :: Nil - // Output columns of the analyzed input query plan - def outputColumns: Seq[Attribute] + // Output column names of the analyzed input query plan. + def outputColumnNames: Seq[String] + + // Output columns of the analyzed input query plan. + def outputColumns: Seq[Attribute] = + DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames) lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics @@ -53,3 +58,35 @@ trait DataWritingCommand extends Command { def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] } + +object DataWritingCommand { + /** + * Returns output attributes with provided names. + * The length of provided names should be the same of the length of [[LogicalPlan.output]]. + */ + def logicalPlanOutputWithNames( + query: LogicalPlan, + names: Seq[String]): Seq[Attribute] = { + // Save the output attributes to a variable to avoid duplicated function calls. + val outputAttributes = query.output + assert(outputAttributes.length == names.length, + "The length of provided names doesn't match the length of output attributes.") + outputAttributes.zip(names).map { case (attr, outputName) => + attr.withName(outputName) + } + } + + /** + * Returns schema of logical plan with provided names. + * The length of provided names should be the same of the length of [[LogicalPlan.schema]]. + */ + def logicalPlanSchemaWithNames( + query: LogicalPlan, + names: Seq[String]): StructType = { + assert(query.schema.length == names.length, + "The length of provided names doesn't match the length of query schema.") + StructType(query.schema.zip(names).map { case (structField, outputName) => + structField.copy(name = outputName) + }) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index e974776..93ac174 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -139,7 +139,7 @@ case class CreateDataSourceTableAsSelectCommand( table: CatalogTable, mode: SaveMode, query: LogicalPlan, - outputColumns: Seq[Attribute]) + outputColumnNames: Seq[String]) extends DataWritingCommand { override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { @@ -213,7 +213,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.writeAndRead(mode, query, outputColumns, physicalPlan) + dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6e1b572..f4aebef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat @@ -479,7 +480,7 @@ case class DataSource( mode = mode, catalogTable = catalogTable, fileIndex = fileIndex, - outputColumns = data.output) + outputColumnNames = data.output.map(_.name)) } /** @@ -489,9 +490,9 @@ case class DataSource( * @param mode The save mode for this writing. * @param data The input query plan that produces the data to be written. Note that this plan * is analyzed and optimized. - * @param outputColumns The original output columns of the input query plan. The optimizer may not - * preserve the output column's names' case, so we need this parameter - * instead of `data.output`. + * @param outputColumnNames The original output column names of the input query plan. The + * optimizer may not preserve the output column's names' case, so we need + * this parameter instead of `data.output`. * @param physicalPlan The physical plan of the input query plan. We should run the writing * command with this physical plan instead of creating a new physical plan, * so that the metrics can be correctly linked to the given physical plan and @@ -500,8 +501,9 @@ case class DataSource( def writeAndRead( mode: SaveMode, data: LogicalPlan, - outputColumns: Seq[Attribute], + outputColumnNames: Seq[String], physicalPlan: SparkPlan): BaseRelation = { + val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") } @@ -524,7 +526,9 @@ case class DataSource( s"Unable to resolve $name given [${data.output.map(_.name).mkString(", ")}]") } } - val resolved = cmd.copy(partitionColumns = resolvedPartCols, outputColumns = outputColumns) + val resolved = cmd.copy( + partitionColumns = resolvedPartCols, + outputColumnNames = outputColumnNames) resolved.run(sparkSession, physicalPlan) // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation() http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3f41612..74aa9a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -139,7 +139,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast case CreateTable(tableDesc, mode, Some(query)) if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => DDLUtils.checkDataColNames(tableDesc.copy(schema = query.schema)) - CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output) + CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name)) case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), parts, query, overwrite, false) if parts.isEmpty => @@ -209,7 +209,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast mode, table, Some(t.location), - actualQuery.output) + actualQuery.output.map(_.name)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d..3cc4a88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -55,14 +55,14 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], - outputColumns: Seq[Attribute]) + outputColumnNames: Seq[String]) extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { // Most formats don't do well with duplicate columns, so lets not allow that - SchemaUtils.checkSchemaColumnNameDuplication( - query.schema, + SchemaUtils.checkColumnNameDuplication( + outputColumnNames, s"when inserting into $outputPath", sparkSession.sessionState.conf.caseSensitiveAnalysis) http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index b3147b0..5c06551 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -794,6 +794,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { + withTable("tbl", "tbl2") { + withView("view1") { + val df = spark.range(10).toDF("id") + df.write.format("parquet").saveAsTable("tbl") + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") + spark.sql("CREATE TABLE tbl2(ID long) USING parquet") + spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") + val identifier = TableIdentifier("tbl2") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString + val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) + assert(spark.read.parquet(location).schema == expectedSchema) + checkAnswer(spark.table("tbl2"), df) + } + } + } + + test("Insert overwrite table command should output correct schema: complex") { + withTable("tbl", "tbl2") { + withView("view1") { + val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", "col2", "col3") + df.write.format("parquet").saveAsTable("tbl") + spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl") + spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + + "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") + spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM view1") + val identifier = TableIdentifier("tbl2") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString + val expectedSchema = StructType(Seq( + StructField("COL1", LongType, true), + StructField("COL3", IntegerType, true), + StructField("COL2", IntegerType, true))) + assert(spark.read.parquet(location).schema == expectedSchema) + checkAnswer(spark.table("tbl2"), df) + } + } + } + + test("Create table as select command should output correct schema: basic") { + withTable("tbl", "tbl2") { + withView("view1") { + val df = spark.range(10).toDF("id") + df.write.format("parquet").saveAsTable("tbl") + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") + spark.sql("CREATE TABLE tbl2 USING parquet AS SELECT ID FROM view1") + val identifier = TableIdentifier("tbl2") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString + val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) + assert(spark.read.parquet(location).schema == expectedSchema) + checkAnswer(spark.table("tbl2"), df) + } + } + } + + test("Create table as select command should output correct schema: complex") { + withTable("tbl", "tbl2") { + withView("view1") { + val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", "col2", "col3") + df.write.format("parquet").saveAsTable("tbl") + spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl") + spark.sql("CREATE TABLE tbl2 USING parquet PARTITIONED BY (COL2) " + + "CLUSTERED BY (COL3) INTO 3 BUCKETS AS SELECT COL1, COL2, COL3 FROM view1") + val identifier = TableIdentifier("tbl2") + val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString + val expectedSchema = StructType(Seq( + StructField("COL1", LongType, true), + StructField("COL3", IntegerType, true), + StructField("COL2", IntegerType, true))) + assert(spark.read.parquet(location).schema == expectedSchema) + checkAnswer(spark.table("tbl2"), df) + } + } + } + test("use Spark jobs to list files") { withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") { withTempDir { dir => http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 8df05cb..fee6f00 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -149,7 +149,7 @@ object HiveAnalysis extends Rule[LogicalPlan] { case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, - ifPartitionNotExists, query.output) + ifPartitionNotExists, query.output.map(_.name)) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => DDLUtils.checkDataColNames(tableDesc) @@ -157,14 +157,14 @@ object HiveAnalysis extends Rule[LogicalPlan] { case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => DDLUtils.checkDataColNames(tableDesc) - CreateHiveTableAsSelectCommand(tableDesc, query, query.output, mode) + CreateHiveTableAsSelectCommand(tableDesc, query, query.output.map(_.name), mode) case InsertIntoDir(isLocal, storage, provider, child, overwrite) if DDLUtils.isHiveTable(provider) => val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) - InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output) + InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output.map(_.name)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 1e801fe..1ff680f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommand case class CreateHiveTableAsSelectCommand( tableDesc: CatalogTable, query: LogicalPlan, - outputColumns: Seq[Attribute], + outputColumnNames: Seq[String], mode: SaveMode) extends DataWritingCommand { @@ -63,13 +63,14 @@ case class CreateHiveTableAsSelectCommand( query, overwrite = false, ifPartitionNotExists = false, - outputColumns = outputColumns).run(sparkSession, child) + outputColumnNames = outputColumnNames).run(sparkSession, child) } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. assert(tableDesc.schema.isEmpty) - catalog.createTable(tableDesc.copy(schema = query.schema), ignoreIfExists = false) + val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames) + catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false) try { // Read back the metadata of the table which was created just now. @@ -82,7 +83,7 @@ case class CreateHiveTableAsSelectCommand( query, overwrite = true, ifPartitionNotExists = false, - outputColumns = outputColumns).run(sparkSession, child) + outputColumnNames = outputColumnNames).run(sparkSession, child) } catch { case NonFatal(e) => // drop the created table. http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index cebeca0..0a73aaa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -57,7 +57,7 @@ case class InsertIntoHiveDirCommand( storage: CatalogStorageFormat, query: LogicalPlan, overwrite: Boolean, - outputColumns: Seq[Attribute]) extends SaveAsHiveFile { + outputColumnNames: Seq[String]) extends SaveAsHiveFile { override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { assert(storage.locationUri.nonEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 02a60f1..75a0563 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -69,7 +69,7 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - outputColumns: Seq[Attribute]) extends SaveAsHiveFile { + outputColumnNames: Seq[String]) extends SaveAsHiveFile { /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the http://git-wip-us.apache.org/repos/asf/spark/blob/9db81fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index db76ec9..7eaac85 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -751,6 +751,54 @@ class HiveDDLSuite } } + test("Insert overwrite Hive table should output correct schema") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") { + withTable("tbl", "tbl2") { + withView("view1") { + spark.sql("CREATE TABLE tbl(id long)") + spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4") + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") + withTempPath { path => + sql( + s""" + |CREATE TABLE tbl2(ID long) USING hive + |OPTIONS(fileFormat 'parquet') + |LOCATION '${path.toURI}' + """.stripMargin) + spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") + val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) + assert(spark.read.parquet(path.toString).schema == expectedSchema) + checkAnswer(spark.table("tbl2"), Seq(Row(4))) + } + } + } + } + } + + test("Create Hive table as select should output correct schema") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") { + withTable("tbl", "tbl2") { + withView("view1") { + spark.sql("CREATE TABLE tbl(id long)") + spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4") + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") + withTempPath { path => + sql( + s""" + |CREATE TABLE tbl2 USING hive + |OPTIONS(fileFormat 'parquet') + |LOCATION '${path.toURI}' + |AS SELECT ID FROM view1 + """.stripMargin) + val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) + assert(spark.read.parquet(path.toString).schema == expectedSchema) + checkAnswer(spark.table("tbl2"), Seq(Row(4))) + } + } + } + } + } + test("alter table partition - storage information") { sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)") sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org