Repository: spark
Updated Branches:
  refs/heads/master a04dcde8c -> 9281a3d50


[SPARK-19919][SQL] Defer throwing the exception for empty paths in CSV 
datasource into `DataSource`

## What changes were proposed in this pull request?

This PR proposes to defer throwing the exception within `DataSource`.

Currently, if other datasources fail to infer the schema, it returns `None` and 
then this is being validated in `DataSource` as below:

```
scala> spark.read.json("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for JSON. It 
must be specified manually.;
```

```
scala> spark.read.orc("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC. It must 
be specified manually.;
```

```
scala> spark.read.parquet("emptydir")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It 
must be specified manually.;
```

However, CSV it checks it within the datasource implementation and throws 
another exception message as below:

```
scala> spark.read.csv("emptydir")
java.lang.IllegalArgumentException: requirement failed: Cannot infer schema 
from an empty set of files
```

We could remove this duplicated check and validate this in one place in the 
same way with the same message.

## How was this patch tested?

Unit test in `CSVSuite` and manual test.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #17256 from HyukjinKwon/SPARK-19919.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9281a3d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9281a3d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9281a3d5

Branch: refs/heads/master
Commit: 9281a3d504d526440c1d445075e38a6d9142ac93
Parents: a04dcde
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Wed Mar 22 08:41:46 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Mar 22 08:41:46 2017 +0800

----------------------------------------------------------------------
 .../datasources/csv/CSVDataSource.scala         | 25 ++++++++++++++------
 .../datasources/csv/CSVFileFormat.scala         |  4 +---
 .../sql/test/DataFrameReaderWriterSuite.scala   |  6 +++--
 3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9281a3d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 63af18e..83bdf6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -54,10 +54,21 @@ abstract class CSVDataSource extends Serializable {
   /**
    * Infers the schema from `inputPaths` files.
    */
-  def infer(
+  final def inferSchema(
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
-      parsedOptions: CSVOptions): Option[StructType]
+      parsedOptions: CSVOptions): Option[StructType] = {
+    if (inputPaths.nonEmpty) {
+      Some(infer(sparkSession, inputPaths, parsedOptions))
+    } else {
+      None
+    }
+  }
+
+  protected def infer(
+      sparkSession: SparkSession,
+      inputPaths: Seq[FileStatus],
+      parsedOptions: CSVOptions): StructType
 
   /**
    * Generates a header from the given row which is null-safe and 
duplicate-safe.
@@ -131,10 +142,10 @@ object TextInputCSVDataSource extends CSVDataSource {
   override def infer(
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
-      parsedOptions: CSVOptions): Option[StructType] = {
+      parsedOptions: CSVOptions): StructType = {
     val csv = createBaseDataset(sparkSession, inputPaths, parsedOptions)
     val maybeFirstLine = CSVUtils.filterCommentAndEmpty(csv, 
parsedOptions).take(1).headOption
-    Some(inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions))
+    inferFromDataset(sparkSession, csv, maybeFirstLine, parsedOptions)
   }
 
   /**
@@ -203,7 +214,7 @@ object WholeFileCSVDataSource extends CSVDataSource {
   override def infer(
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
-      parsedOptions: CSVOptions): Option[StructType] = {
+      parsedOptions: CSVOptions): StructType = {
     val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
     csv.flatMap { lines =>
       UnivocityParser.tokenizeStream(
@@ -222,10 +233,10 @@ object WholeFileCSVDataSource extends CSVDataSource {
             parsedOptions.headerFlag,
             new CsvParser(parsedOptions.asParserSettings))
         }
-        Some(CSVInferSchema.infer(tokenRDD, header, parsedOptions))
+        CSVInferSchema.infer(tokenRDD, header, parsedOptions)
       case None =>
         // If the first row could not be read, just return the empty schema.
-        Some(StructType(Nil))
+        StructType(Nil)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9281a3d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index eef43c7..a99bdfe 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -51,12 +51,10 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
-    require(files.nonEmpty, "Cannot infer schema from an empty set of files")
-
     val parsedOptions =
       new CSVOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
 
-    CSVDataSource(parsedOptions).infer(sparkSession, files, parsedOptions)
+    CSVDataSource(parsedOptions).inferSchema(sparkSession, files, 
parsedOptions)
   }
 
   override def prepareWrite(

http://git-wip-us.apache.org/repos/asf/spark/blob/9281a3d5/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 8a8ba05..8287776 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -370,9 +370,11 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     val schema = df.schema
 
     // Reader, without user specified schema
-    intercept[IllegalArgumentException] {
+    val message = intercept[AnalysisException] {
       testRead(spark.read.csv(), Seq.empty, schema)
-    }
+    }.getMessage
+    assert(message.contains("Unable to infer schema for CSV. It must be 
specified manually."))
+
     testRead(spark.read.csv(dir), data, schema)
     testRead(spark.read.csv(dir, dir), data ++ data, schema)
     testRead(spark.read.csv(Seq(dir, dir): _*), data ++ data, schema)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to