Repository: spark Updated Branches: refs/heads/master dc005ed53 -> d3147502e
[SPARK-15615][SQL] Add an API to load DataFrame from Dataset[String] storing JSON ## What changes were proposed in this pull request? SPARK-15615 proposes replacing the sqlContext.read.json(rdd) with a dataset equivalent. SPARK-15463 adds a CSV API for reading from Dataset[String] so this keeps the API consistent. I am deprecating the existing RDD based APIs. ## How was this patch tested? There are existing tests. I left most tests to use the existing APIs as they delegate to the new json API. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: pj.fanning <pj.fann...@workday.com> Author: PJ Fanning <pjfann...@users.noreply.github.com> Closes #16895 from pjfanning/SPARK-15615. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3147502 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3147502 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3147502 Branch: refs/heads/master Commit: d3147502e7837d81e27193164b3513bb69fa3797 Parents: dc005ed Author: pj.fanning <pj.fann...@workday.com> Authored: Wed Feb 22 18:03:25 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Feb 22 18:03:25 2017 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameReader.scala | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3147502/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index cb9493a..4c1341e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -323,6 +323,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @param jsonRDD input RDD with one JSON object per record * @since 1.4.0 */ + @deprecated("Use json(Dataset[String]) instead.", "2.2.0") def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd) /** @@ -335,7 +336,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @param jsonRDD input RDD with one JSON object per record * @since 1.4.0 */ + @deprecated("Use json(Dataset[String]) instead.", "2.2.0") def json(jsonRDD: RDD[String]): DataFrame = { + json(sparkSession.createDataset(jsonRDD)(Encoders.STRING)) + } + + /** + * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines + * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`. + * + * Unless the schema is specified using `schema` function, this function goes through the + * input once to determine the input schema. + * + * @param jsonDataset input Dataset with one JSON object per record + * @since 2.2.0 + */ + def json(jsonDataset: Dataset[String]): DataFrame = { val parsedOptions = new JSONOptions( extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone, @@ -344,12 +360,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val schema = userSpecifiedSchema.getOrElse { JsonInferSchema.infer( - jsonRDD, + jsonDataset.rdd, parsedOptions, createParser) } - val parsed = jsonRDD.mapPartitions { iter => + val parsed = jsonDataset.rdd.mapPartitions { iter => val parser = new JacksonParser(schema, parsedOptions) iter.flatMap(parser.parse(_, createParser, UTF8String.fromString)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org