Repository: spark
Updated Branches:
  refs/heads/master 487faf17a -> e3de6ab30


[SPARK-24068] Propagating DataFrameReader's options to Text datasource on 
schema inferring

## What changes were proposed in this pull request?

While reading CSV or JSON files, DataFrameReader's options are converted to 
Hadoop's parameters, for example there:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302

but the options are not propagated to Text datasource on schema inferring, for 
instance:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188

The PR proposes propagation of user's options to Text datasource on scheme 
inferring in similar way as user's options are converted to Hadoop parameters 
if schema is specified.

## How was this patch tested?
The changes were tested manually by using https://github.com/twitter/hadoop-lzo:

```
hadoop-lzo> mvn clean package
hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar
```
Create 2 test files in JSON and CSV format and compress them:
```shell
$ cat test.csv
col1|col2
a|1
$ lzop test.csv
$ cat test.json
{"col1":"a","col2":1}
$ lzop test.json
```
Run `spark-shell` with hadoop-lzo:
```
bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar
```
reading compressed CSV and JSON without schema:
```scala
spark.read.option("io.compression.codecs", 
"com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show()
+----+----+
|col1|col2|
+----+----+
|   a|   1|
+----+----+
```
```scala
spark.read.option("io.compression.codecs", 
"com.hadoop.compression.lzo.LzopCodec").option("multiLine", 
true).json("test.json.lzo").printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
```

Author: Maxim Gekk <maxim.g...@databricks.com>
Author: Maxim Gekk <max.g...@gmail.com>

Closes #21182 from MaxGekk/text-options.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3de6ab3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3de6ab3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3de6ab3

Branch: refs/heads/master
Commit: e3de6ab30d52890eb08578e55eb4a5d2b4e7aa35
Parents: 487faf1
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Wed May 9 08:32:20 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Wed May 9 08:32:20 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/json/JSONOptions.scala  |  2 +-
 .../sql/execution/datasources/csv/CSVDataSource.scala     |  6 ++++--
 .../spark/sql/execution/datasources/csv/CSVOptions.scala  |  2 +-
 .../spark/sql/execution/datasources/csv/CSVUtils.scala    |  2 --
 .../sql/execution/datasources/csv/UnivocityParser.scala   |  2 --
 .../sql/execution/datasources/json/JsonDataSource.scala   | 10 ++++++----
 6 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 2579374..2ff12ac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util._
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-    @transient private val parameters: CaseInsensitiveMap[String],
+    @transient val parameters: CaseInsensitiveMap[String],
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable  {

http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index bc1f4ab..dc54d18 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -185,7 +185,8 @@ object TextInputCSVDataSource extends CSVDataSource {
         DataSource.apply(
           sparkSession,
           paths = paths,
-          className = classOf[TextFileFormat].getName
+          className = classOf[TextFileFormat].getName,
+          options = options.parameters
         ).resolveRelation(checkFilesExist = false))
         .select("value").as[String](Encoders.STRING)
     } else {
@@ -250,7 +251,8 @@ object MultiLineCSVDataSource extends CSVDataSource {
       options: CSVOptions): RDD[PortableDataStream] = {
     val paths = inputPaths.map(_.getPath)
     val name = paths.mkString(",")
-    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+    val job = 
Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
+      options.parameters))
     FileInputFormat.setInputPaths(job, paths: _*)
     val conf = job.getConfiguration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 2ec0fc6..ed2dc65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util._
 
 class CSVOptions(
-    @transient private val parameters: CaseInsensitiveMap[String],
+    @transient val parameters: CaseInsensitiveMap[String],
     defaultTimeZoneId: String,
     defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable {

http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index 31464f1..9dae41b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
-import org.apache.spark.input.PortableDataStream
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.catalyst.json.JSONOptions
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3d6cc30..99557a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv
 
 import java.io.InputStream
 import java.math.BigDecimal
-import java.text.NumberFormat
-import java.util.Locale
 
 import scala.util.Try
 import scala.util.control.NonFatal

http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 983a5f0..ba83df0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -121,7 +121,7 @@ object TextInputJsonDataSource extends JsonDataSource {
         sparkSession,
         paths = paths,
         className = classOf[TextFileFormat].getName,
-        options = textOptions
+        options = parsedOptions.parameters
       ).resolveRelation(checkFilesExist = false))
       .select("value").as(Encoders.STRING)
   }
@@ -159,7 +159,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType = {
-    val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
+    val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, 
inputPaths, parsedOptions)
     val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, 
parsedOptions)
     val parser = parsedOptions.encoding
       .map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
@@ -170,9 +170,11 @@ object MultiLineJsonDataSource extends JsonDataSource {
 
   private def createBaseRdd(
       sparkSession: SparkSession,
-      inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = {
+      inputPaths: Seq[FileStatus],
+      parsedOptions: JSONOptions): RDD[PortableDataStream] = {
     val paths = inputPaths.map(_.getPath)
-    val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
+    val job = 
Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions(
+      parsedOptions.parameters))
     val conf = job.getConfiguration
     val name = paths.mkString(",")
     FileInputFormat.setInputPaths(job, paths: _*)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to