spark git commit: [SPARK-15982][SPARK-16009][SPARK-16007][SQL] Harmonize the behavior of DataFrameReader.text/csv/json/parquet/orc
Repository: spark Updated Branches: refs/heads/branch-2.0 8159da20e -> 54001cb12 [SPARK-15982][SPARK-16009][SPARK-16007][SQL] Harmonize the behavior of DataFrameReader.text/csv/json/parquet/orc ## What changes were proposed in this pull request? Issues with current reader behavior. - `text()` without args returns an empty DF with no columns -> inconsistent, its expected that text will always return a DF with `value` string field, - `textFile()` without args fails with exception because of the above reason, it expected the DF returned by `text()` to have a `value` field. - `orc()` does not have var args, inconsistent with others - `json(single-arg)` was removed, but that caused source compatibility issues - [SPARK-16009](https://issues.apache.org/jira/browse/SPARK-16009) - user specified schema was not respected when `text/csv/...` were used with no args - [SPARK-16007](https://issues.apache.org/jira/browse/SPARK-16007) The solution I am implementing is to do the following. - For each format, there will be a single argument method, and a vararg method. For json, parquet, csv, text, this means adding json(string), etc.. For orc, this means adding orc(varargs). - Remove the special handling of text(), csv(), etc. that returns empty dataframe with no fields. Rather pass on the empty sequence of paths to the datasource, and let each datasource handle it right. For e.g, text data source, should return empty DF with schema (value: string) - Deduped docs and fixed their formatting. ## How was this patch tested? Added new unit tests for Scala and Java tests Author: Tathagata Das Closes #13727 from tdas/SPARK-15982. (cherry picked from commit b99129cc452defc266f6d357f5baab5f4ff37a36) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54001cb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54001cb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54001cb1 Branch: refs/heads/branch-2.0 Commit: 54001cb129674be9f2459368fb608367f52371c2 Parents: 8159da2 Author: Tathagata Das Authored: Mon Jun 20 14:52:28 2016 -0700 Committer: Shixiong Zhu Committed: Mon Jun 20 14:52:35 2016 -0700 -- .../org/apache/spark/sql/DataFrameReader.scala | 132 + .../sql/JavaDataFrameReaderWriterSuite.java | 158 .../sql/test/DataFrameReaderWriterSuite.scala | 186 --- 3 files changed, 420 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54001cb1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 2ae854d..841503b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -119,13 +119,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(): DataFrame = { -val dataSource = - DataSource( -sparkSession, -userSpecifiedSchema = userSpecifiedSchema, -className = source, -options = extraOptions.toMap) -Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())) +load(Seq.empty: _*) // force invocation of `load(...varargs...)` } /** @@ -135,7 +129,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(path: String): DataFrame = { -option("path", path).load() +load(Seq(path): _*) // force invocation of `load(...varargs...)` } /** @@ -146,18 +140,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def load(paths: String*): DataFrame = { -if (paths.isEmpty) { - sparkSession.emptyDataFrame -} else { - sparkSession.baseRelationToDataFrame( -DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) -} +sparkSession.baseRelationToDataFrame( + DataSource.apply( +sparkSession, +paths = paths, +userSpecifiedSchema = userSpecifiedSchema, +className = source, +options = extraOptions.toMap).resolveRelation()) } + /** * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. @@ -247,11 +238,23 @@ class DataFrameReader private[sql](sparkSession: SparkSession) ex
spark git commit: [SPARK-15982][SPARK-16009][SPARK-16007][SQL] Harmonize the behavior of DataFrameReader.text/csv/json/parquet/orc
Repository: spark Updated Branches: refs/heads/master 6df8e3886 -> b99129cc4 [SPARK-15982][SPARK-16009][SPARK-16007][SQL] Harmonize the behavior of DataFrameReader.text/csv/json/parquet/orc ## What changes were proposed in this pull request? Issues with current reader behavior. - `text()` without args returns an empty DF with no columns -> inconsistent, its expected that text will always return a DF with `value` string field, - `textFile()` without args fails with exception because of the above reason, it expected the DF returned by `text()` to have a `value` field. - `orc()` does not have var args, inconsistent with others - `json(single-arg)` was removed, but that caused source compatibility issues - [SPARK-16009](https://issues.apache.org/jira/browse/SPARK-16009) - user specified schema was not respected when `text/csv/...` were used with no args - [SPARK-16007](https://issues.apache.org/jira/browse/SPARK-16007) The solution I am implementing is to do the following. - For each format, there will be a single argument method, and a vararg method. For json, parquet, csv, text, this means adding json(string), etc.. For orc, this means adding orc(varargs). - Remove the special handling of text(), csv(), etc. that returns empty dataframe with no fields. Rather pass on the empty sequence of paths to the datasource, and let each datasource handle it right. For e.g, text data source, should return empty DF with schema (value: string) - Deduped docs and fixed their formatting. ## How was this patch tested? Added new unit tests for Scala and Java tests Author: Tathagata Das Closes #13727 from tdas/SPARK-15982. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b99129cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b99129cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b99129cc Branch: refs/heads/master Commit: b99129cc452defc266f6d357f5baab5f4ff37a36 Parents: 6df8e38 Author: Tathagata Das Authored: Mon Jun 20 14:52:28 2016 -0700 Committer: Shixiong Zhu Committed: Mon Jun 20 14:52:28 2016 -0700 -- .../org/apache/spark/sql/DataFrameReader.scala | 132 + .../sql/JavaDataFrameReaderWriterSuite.java | 158 .../sql/test/DataFrameReaderWriterSuite.scala | 186 --- 3 files changed, 420 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b99129cc/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 2ae854d..841503b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -119,13 +119,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(): DataFrame = { -val dataSource = - DataSource( -sparkSession, -userSpecifiedSchema = userSpecifiedSchema, -className = source, -options = extraOptions.toMap) -Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())) +load(Seq.empty: _*) // force invocation of `load(...varargs...)` } /** @@ -135,7 +129,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(path: String): DataFrame = { -option("path", path).load() +load(Seq(path): _*) // force invocation of `load(...varargs...)` } /** @@ -146,18 +140,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def load(paths: String*): DataFrame = { -if (paths.isEmpty) { - sparkSession.emptyDataFrame -} else { - sparkSession.baseRelationToDataFrame( -DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) -} +sparkSession.baseRelationToDataFrame( + DataSource.apply( +sparkSession, +paths = paths, +userSpecifiedSchema = userSpecifiedSchema, +className = source, +options = extraOptions.toMap).resolveRelation()) } + /** * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. @@ -247,11 +238,23 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Loads a JSON file (one object per line) and returns the result as a [[DataF