Repository: spark Updated Branches: refs/heads/master 7b9071eea -> 5827b65e2
[SPARK-15808][SQL] File Format Checking When Appending Data #### What changes were proposed in this pull request? **Issue:** Got wrong results or strange errors when append data to a table with mismatched file format. _Example 1: PARQUET -> CSV_ ```Scala createDF(0, 9).write.format("parquet").saveAsTable("appendParquetToOrc") createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendParquetToOrc") ``` Error we got: ``` Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.RuntimeException: file:/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/warehouse-bc8fedf2-aa6a-4002-a18b-524c6ac859d4/appendorctoparquet/part-r-00000-c0e3f365-1d46-4df5-a82c-b47d7af9feb9.snappy.orc is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [79, 82, 67, 23] ``` _Example 2: Json -> CSV_ ```Scala createDF(0, 9).write.format("json").saveAsTable("appendJsonToCSV") createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("appendJsonToCSV") ``` No exception, but wrong results: ``` +----+----+ | c1| c2| +----+----+ |null|null| |null|null| |null|null| |null|null| | 0|str0| | 1|str1| | 2|str2| | 3|str3| | 4|str4| | 5|str5| | 6|str6| | 7|str7| | 8|str8| | 9|str9| +----+----+ ``` _Example 3: Json -> Text_ ```Scala createDF(0, 9).write.format("json").saveAsTable("appendJsonToText") createDF(10, 19).write.mode(SaveMode.Append).format("text").saveAsTable("appendJsonToText") ``` Error we got: ``` Text data source supports only a single column, and you have 2 columns. ``` This PR is to issue an exception with appropriate error messages. #### How was this patch tested? Added test cases. Author: gatorsmile <gatorsm...@gmail.com> Closes #13546 from gatorsmile/fileFormatCheck. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5827b65e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5827b65e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5827b65e Branch: refs/heads/master Commit: 5827b65e28da168286c771c53a38620d79f5e74f Parents: 7b9071e Author: gatorsmile <gatorsm...@gmail.com> Authored: Mon Jun 13 19:31:40 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Jun 13 19:31:40 2016 -0700 ---------------------------------------------------------------------- .../command/createDataSourceTables.scala | 9 +++ .../sql/hive/MetastoreDataSourcesSuite.scala | 72 ++++++++++++++++++++ 2 files changed, 81 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5827b65e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 865e406..4918780 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -197,6 +197,15 @@ case class CreateDataSourceTableAsSelectCommand( EliminateSubqueryAliases( sessionState.catalog.lookupRelation(tableIdent)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + // check if the file formats match + l.relation match { + case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass => + throw new AnalysisException( + s"The file format of the existing table $tableIdent is " + + s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " + + s"format `$provider`") + case _ => + } if (query.schema.size != l.schema.size) { throw new AnalysisException( s"The column number of the existing schema[${l.schema}] " + http://git-wip-us.apache.org/repos/asf/spark/blob/5827b65e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 3d8123d..b028d49 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -891,6 +891,78 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("append table using different formats") { + def createDF(from: Int, to: Int): DataFrame = { + (from to to).map(i => i -> s"str$i").toDF("c1", "c2") + } + + withTable("appendOrcToParquet") { + createDF(0, 9).write.format("parquet").saveAsTable("appendOrcToParquet") + val e = intercept[AnalysisException] { + createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet") + } + assert(e.getMessage.contains("The file format of the existing table `appendOrcToParquet` " + + "is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " + + "It doesn't match the specified format `orc`")) + } + + withTable("appendParquetToJson") { + createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson") + val e = intercept[AnalysisException] { + createDF(10, 19).write.mode(SaveMode.Append).format("parquet") + .saveAsTable("appendParquetToJson") + } + assert(e.getMessage.contains("The file format of the existing table `appendParquetToJson` " + + "is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + + "It doesn't match the specified format `parquet`")) + } + + withTable("appendTextToJson") { + createDF(0, 9).write.format("json").saveAsTable("appendTextToJson") + val e = intercept[AnalysisException] { + createDF(10, 19).write.mode(SaveMode.Append).format("text") + .saveAsTable("appendTextToJson") + } + assert(e.getMessage.contains("The file format of the existing table `appendTextToJson` is " + + "`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + + "It doesn't match the specified format `text`")) + } + } + + test("append a table using the same formats but different names") { + def createDF(from: Int, to: Int): DataFrame = { + (from to to).map(i => i -> s"str$i").toDF("c1", "c2") + } + + withTable("appendParquet") { + createDF(0, 9).write.format("parquet").saveAsTable("appendParquet") + createDF(10, 19).write.mode(SaveMode.Append).format("org.apache.spark.sql.parquet") + .saveAsTable("appendParquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) + } + + withTable("appendParquet") { + createDF(0, 9).write.format("org.apache.spark.sql.parquet").saveAsTable("appendParquet") + createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("appendParquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) + } + + withTable("appendParquet") { + createDF(0, 9).write.format("org.apache.spark.sql.parquet.DefaultSource") + .saveAsTable("appendParquet") + createDF(10, 19).write.mode(SaveMode.Append) + .format("org.apache.spark.sql.execution.datasources.parquet.DefaultSource") + .saveAsTable("appendParquet") + checkAnswer( + sql("SELECT p.c1, p.c2 FROM appendParquet p WHERE p.c1 > 5"), + (6 to 19).map(i => Row(i, s"str$i"))) + } + } + test("SPARK-8156:create table to specific database by 'use dbname' ") { val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org