Repository: spark
Updated Branches:
  refs/heads/master a4b14a9cf -> 46fe40838


[SPARK-25669][SQL] Check CSV header only when it exists

## What changes were proposed in this pull request?

Currently the first row of dataset of CSV strings is compared to field names of 
user specified or inferred schema independently of presence of CSV header. It 
causes false-positive error messages. For example, parsing `"1,2"` outputs the 
error:

```java
java.lang.IllegalArgumentException: CSV header does not conform to the schema.
 Header: 1, 2
 Schema: _c0, _c1
Expected: _c0 but found: 1
```

In the PR, I propose:
- Checking CSV header only when it exists
- Filter header from the input dataset only if it exists

## How was this patch tested?

Added a test to `CSVSuite` which reproduces the issue.

Closes #22656 from MaxGekk/inferred-header-check.

Authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46fe4083
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46fe4083
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46fe4083

Branch: refs/heads/master
Commit: 46fe40838aa682a7073dd6f1373518b0c8498a94
Parents: a4b14a9
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Tue Oct 9 14:35:00 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Tue Oct 9 14:35:00 2018 +0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/DataFrameReader.scala | 7 +++++--
 .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 6 ++++++
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46fe4083/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index fe69f25..7269446 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -505,7 +505,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     val actualSchema =
       StructType(schema.filterNot(_.name == 
parsedOptions.columnNameOfCorruptRecord))
 
-    val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
+    val linesWithoutHeader = if (parsedOptions.headerFlag && 
maybeFirstLine.isDefined) {
+      val firstLine = maybeFirstLine.get
       val parser = new CsvParser(parsedOptions.asParserSettings)
       val columnNames = parser.parseLine(firstLine)
       CSVDataSource.checkHeaderColumnNames(
@@ -515,7 +516,9 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
         parsedOptions.enforceSchema,
         sparkSession.sessionState.conf.caseSensitiveAnalysis)
       filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, 
parsedOptions))
-    }.getOrElse(filteredLines.rdd)
+    } else {
+      filteredLines.rdd
+    }
 
     val parsed = linesWithoutHeader.mapPartitions { iter =>
       val rawParser = new UnivocityParser(actualSchema, parsedOptions)

http://git-wip-us.apache.org/repos/asf/spark/blob/46fe4083/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index f70df0b..5d4746c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1820,4 +1820,10 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     checkAnswer(spark.read.option("multiLine", 
true).schema(schema).csv(input), Row(null))
     assert(spark.read.csv(input).collect().toSet == Set(Row()))
   }
+
+  test("field names of inferred schema shouldn't compare to the first row") {
+    val input = Seq("1,2").toDS()
+    val df = spark.read.option("enforceSchema", false).csv(input)
+    checkAnswer(df, Row("1", "2"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to