Repository: spark Updated Branches: refs/heads/master 487faf17a -> e3de6ab30
[SPARK-24068] Propagating DataFrameReader's options to Text datasource on schema inferring ## What changes were proposed in this pull request? While reading CSV or JSON files, DataFrameReader's options are converted to Hadoop's parameters, for example there: https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302 but the options are not propagated to Text datasource on schema inferring, for instance: https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188 The PR proposes propagation of user's options to Text datasource on scheme inferring in similar way as user's options are converted to Hadoop parameters if schema is specified. ## How was this patch tested? The changes were tested manually by using https://github.com/twitter/hadoop-lzo: ``` hadoop-lzo> mvn clean package hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar ``` Create 2 test files in JSON and CSV format and compress them: ```shell $ cat test.csv col1|col2 a|1 $ lzop test.csv $ cat test.json {"col1":"a","col2":1} $ lzop test.json ``` Run `spark-shell` with hadoop-lzo: ``` bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar ``` reading compressed CSV and JSON without schema: ```scala spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show() +----+----+ |col1|col2| +----+----+ | a| 1| +----+----+ ``` ```scala spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("multiLine", true).json("test.json.lzo").printSchema root |-- col1: string (nullable = true) |-- col2: long (nullable = true) ``` Author: Maxim Gekk <maxim.g...@databricks.com> Author: Maxim Gekk <max.g...@gmail.com> Closes #21182 from MaxGekk/text-options. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3de6ab3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3de6ab3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3de6ab3 Branch: refs/heads/master Commit: e3de6ab30d52890eb08578e55eb4a5d2b4e7aa35 Parents: 487faf1 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Wed May 9 08:32:20 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Wed May 9 08:32:20 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/catalyst/json/JSONOptions.scala | 2 +- .../sql/execution/datasources/csv/CSVDataSource.scala | 6 ++++-- .../spark/sql/execution/datasources/csv/CSVOptions.scala | 2 +- .../spark/sql/execution/datasources/csv/CSVUtils.scala | 2 -- .../sql/execution/datasources/csv/UnivocityParser.scala | 2 -- .../sql/execution/datasources/json/JsonDataSource.scala | 10 ++++++---- 6 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 2579374..2ff12ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util._ * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( - @transient private val parameters: CaseInsensitiveMap[String], + @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index bc1f4ab..dc54d18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -185,7 +185,8 @@ object TextInputCSVDataSource extends CSVDataSource { DataSource.apply( sparkSession, paths = paths, - className = classOf[TextFileFormat].getName + className = classOf[TextFileFormat].getName, + options = options.parameters ).resolveRelation(checkFilesExist = false)) .select("value").as[String](Encoders.STRING) } else { @@ -250,7 +251,8 @@ object MultiLineCSVDataSource extends CSVDataSource { options: CSVOptions): RDD[PortableDataStream] = { val paths = inputPaths.map(_.getPath) val name = paths.mkString(",") - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions( + options.parameters)) FileInputFormat.setInputPaths(job, paths: _*) val conf = job.getConfiguration http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 2ec0fc6..ed2dc65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util._ class CSVOptions( - @transient private val parameters: CaseInsensitiveMap[String], + @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala index 31464f1..9dae41b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.execution.datasources.csv -import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset -import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 3d6cc30..99557a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.InputStream import java.math.BigDecimal -import java.text.NumberFormat -import java.util.Locale import scala.util.Try import scala.util.control.NonFatal http://git-wip-us.apache.org/repos/asf/spark/blob/e3de6ab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 983a5f0..ba83df0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -121,7 +121,7 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession, paths = paths, className = classOf[TextFileFormat].getName, - options = textOptions + options = parsedOptions.parameters ).resolveRelation(checkFilesExist = false)) .select("value").as(Encoders.STRING) } @@ -159,7 +159,7 @@ object MultiLineJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { - val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) + val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths, parsedOptions) val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) val parser = parsedOptions.encoding .map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream)) @@ -170,9 +170,11 @@ object MultiLineJsonDataSource extends JsonDataSource { private def createBaseRdd( sparkSession: SparkSession, - inputPaths: Seq[FileStatus]): RDD[PortableDataStream] = { + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): RDD[PortableDataStream] = { val paths = inputPaths.map(_.getPath) - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val job = Job.getInstance(sparkSession.sessionState.newHadoopConfWithOptions( + parsedOptions.parameters)) val conf = job.getConfiguration val name = paths.mkString(",") FileInputFormat.setInputPaths(job, paths: _*) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org