Repository: spark
Updated Branches:
  refs/heads/master 926543664 -> f6471dc0d


[SPARK-19709][SQL] Read empty file with CSV data source

## What changes were proposed in this pull request?

Bugfix for reading empty file with CSV data source. Instead of throwing 
`NoSuchElementException`, an empty data frame is returned.

## How was this patch tested?

Added new unit test in `org.apache.spark.sql.execution.datasources.csv.CSVSuite`

Author: Wojtek Szymanski <wk.szyman...@gmail.com>

Closes #17068 from wojtek-szymanski/SPARK-19709.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6471dc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6471dc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6471dc0

Branch: refs/heads/master
Commit: f6471dc0d5db2d98e48f9f1ae1dba0f174ed9648
Parents: 9265436
Author: Wojtek Szymanski <wk.szyman...@gmail.com>
Authored: Mon Mar 6 13:19:36 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Mar 6 13:19:36 2017 -0800

----------------------------------------------------------------------
 .../datasources/csv/CSVDataSource.scala         | 68 +++++++++++---------
 .../execution/datasources/csv/CSVSuite.scala    | 10 ++-
 2 files changed, 40 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6471dc0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 73e6abc..4756703 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -133,20 +133,24 @@ object TextInputCSVDataSource extends CSVDataSource {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: CSVOptions): Option[StructType] = {
-    val csv: Dataset[String] = createBaseDataset(sparkSession, inputPaths, 
parsedOptions)
-    val firstLine: String = CSVUtils.filterCommentAndEmpty(csv, 
parsedOptions).first()
-    val firstRow = new 
CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
-    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
-    val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
-    val tokenRDD = csv.rdd.mapPartitions { iter =>
-      val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
-      val linesWithoutHeader =
-        CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
-      val parser = new CsvParser(parsedOptions.asParserSettings)
-      linesWithoutHeader.map(parser.parseLine)
+    val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
+    CSVUtils.filterCommentAndEmpty(csv, parsedOptions).take(1).headOption 
match {
+      case Some(firstLine) =>
+        val firstRow = new 
CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
+        val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+        val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+        val tokenRDD = csv.rdd.mapPartitions { iter =>
+          val filteredLines = CSVUtils.filterCommentAndEmpty(iter, 
parsedOptions)
+          val linesWithoutHeader =
+            CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
+          val parser = new CsvParser(parsedOptions.asParserSettings)
+          linesWithoutHeader.map(parser.parseLine)
+        }
+        Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
+      case None =>
+        // If the first line could not be read, just return the empty schema.
+        Some(StructType(Nil))
     }
-
-    Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
   }
 
   private def createBaseDataset(
@@ -190,28 +194,28 @@ object WholeFileCSVDataSource extends CSVDataSource {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: CSVOptions): Option[StructType] = {
-    val csv: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, 
parsedOptions)
-    val maybeFirstRow: Option[Array[String]] = csv.flatMap { lines =>
+    val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
+    csv.flatMap { lines =>
       UnivocityParser.tokenizeStream(
         
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, 
lines.getPath()),
-        false,
+        shouldDropHeader = false,
         new CsvParser(parsedOptions.asParserSettings))
-    }.take(1).headOption
-
-    if (maybeFirstRow.isDefined) {
-      val firstRow = maybeFirstRow.get
-      val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
-      val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
-      val tokenRDD = csv.flatMap { lines =>
-        UnivocityParser.tokenizeStream(
-          
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, 
lines.getPath()),
-          parsedOptions.headerFlag,
-          new CsvParser(parsedOptions.asParserSettings))
-      }
-      Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
-    } else {
-      // If the first row could not be read, just return the empty schema.
-      Some(StructType(Nil))
+    }.take(1).headOption match {
+      case Some(firstRow) =>
+        val caseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+        val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+        val tokenRDD = csv.flatMap { lines =>
+          UnivocityParser.tokenizeStream(
+            CodecStreams.createInputStreamWithCloseResource(
+              lines.getConfiguration,
+              lines.getPath()),
+            parsedOptions.headerFlag,
+            new CsvParser(parsedOptions.asParserSettings))
+        }
+        Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
+      case None =>
+        // If the first row could not be read, just return the empty schema.
+        Some(StructType(Nil))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6471dc0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 5607180..eaedede 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1077,14 +1077,12 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
     }
   }
 
-  test("Empty file produces empty dataframe with empty schema - wholeFile 
option") {
-    withTempPath { path =>
-      path.createNewFile()
-
+  test("Empty file produces empty dataframe with empty schema") {
+    Seq(false, true).foreach { wholeFile =>
       val df = spark.read.format("csv")
         .option("header", true)
-        .option("wholeFile", true)
-        .load(path.getAbsolutePath)
+        .option("wholeFile", wholeFile)
+        .load(testFile(emptyFile))
 
       assert(df.schema === spark.emptyDataFrame.schema)
       checkAnswer(df, spark.emptyDataFrame)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to