Repository: spark Updated Branches: refs/heads/master a4b14a9cf -> 46fe40838
[SPARK-25669][SQL] Check CSV header only when it exists ## What changes were proposed in this pull request? Currently the first row of dataset of CSV strings is compared to field names of user specified or inferred schema independently of presence of CSV header. It causes false-positive error messages. For example, parsing `"1,2"` outputs the error: ```java java.lang.IllegalArgumentException: CSV header does not conform to the schema. Header: 1, 2 Schema: _c0, _c1 Expected: _c0 but found: 1 ``` In the PR, I propose: - Checking CSV header only when it exists - Filter header from the input dataset only if it exists ## How was this patch tested? Added a test to `CSVSuite` which reproduces the issue. Closes #22656 from MaxGekk/inferred-header-check. Authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46fe4083 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46fe4083 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46fe4083 Branch: refs/heads/master Commit: 46fe40838aa682a7073dd6f1373518b0c8498a94 Parents: a4b14a9 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Tue Oct 9 14:35:00 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Tue Oct 9 14:35:00 2018 +0800 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 7 +++++-- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 6 ++++++ 2 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/46fe4083/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index fe69f25..7269446 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -505,7 +505,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val actualSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) - val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine => + val linesWithoutHeader = if (parsedOptions.headerFlag && maybeFirstLine.isDefined) { + val firstLine = maybeFirstLine.get val parser = new CsvParser(parsedOptions.asParserSettings) val columnNames = parser.parseLine(firstLine) CSVDataSource.checkHeaderColumnNames( @@ -515,7 +516,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions.enforceSchema, sparkSession.sessionState.conf.caseSensitiveAnalysis) filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions)) - }.getOrElse(filteredLines.rdd) + } else { + filteredLines.rdd + } val parsed = linesWithoutHeader.mapPartitions { iter => val rawParser = new UnivocityParser(actualSchema, parsedOptions) http://git-wip-us.apache.org/repos/asf/spark/blob/46fe4083/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f70df0b..5d4746c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1820,4 +1820,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null)) assert(spark.read.csv(input).collect().toSet == Set(Row())) } + + test("field names of inferred schema shouldn't compare to the first row") { + val input = Seq("1,2").toDS() + val df = spark.read.option("enforceSchema", false).csv(input) + checkAnswer(df, Row("1", "2")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org