Repository: spark
Updated Branches:
  refs/heads/branch-2.0 94482b1e4 -> b96e7f6aa


[SPARK-15898][SQL] DataFrameReader.text should return DataFrame

## What changes were proposed in this pull request?

We want to maintain API compatibility for DataFrameReader.text, and will 
introduce a new API called DataFrameReader.textFile which returns 
Dataset[String].

affected PRs:
https://github.com/apache/spark/pull/11731
https://github.com/apache/spark/pull/13104
https://github.com/apache/spark/pull/13184

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #13604 from cloud-fan/revert.

(cherry picked from commit e2ab79d5ea00af45c083cc9a6607d2f0905f9908)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b96e7f6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b96e7f6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b96e7f6a

Branch: refs/heads/branch-2.0
Commit: b96e7f6aa8c6ed592d6e0ddbece7cf8530da2194
Parents: 94482b1
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Sun Jun 12 21:36:41 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun Jun 12 21:36:47 2016 -0700

----------------------------------------------------------------------
 R/pkg/R/SQLContext.R                            |  7 +++--
 .../org/apache/spark/examples/JavaHdfsLR.java   |  2 +-
 .../org/apache/spark/examples/JavaPageRank.java |  2 +-
 .../apache/spark/examples/JavaWordCount.java    |  2 +-
 .../spark/examples/ml/JavaALSExample.java       |  2 +-
 .../apache/spark/examples/sql/JavaSparkSQL.java |  2 +-
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 .../org/apache/spark/examples/SparkKMeans.scala |  2 +-
 .../apache/spark/examples/SparkPageRank.scala   |  2 +-
 .../apache/spark/examples/ml/ALSExample.scala   |  2 +-
 .../examples/mllib/RankingMetricsExample.scala  |  2 +-
 python/pyspark/sql/readwriter.py                |  8 ++---
 .../org/apache/spark/sql/DataFrameReader.scala  | 31 +++++++++++++++-----
 .../apache/spark/sql/JavaDataFrameSuite.java    |  4 +--
 .../execution/datasources/text/TextSuite.scala  | 20 ++++++-------
 15 files changed, 54 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 584bbbf..e7e9e35 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -364,9 +364,10 @@ parquetFile <- function(x, ...) {
 
 #' Create a SparkDataFrame from a text file.
 #'
-#' Loads a text file and returns a SparkDataFrame with a single string column 
named "value".
-#' If the directory structure of the text files contains partitioning 
information, those are
-#' ignored in the resulting DataFrame.
+#' Loads text files and returns a SparkDataFrame whose schema starts with
+#' a string column named "value", and followed by partitioned columns if
+#' there are any.
+#'
 #' Each line in the text file is a new row in the resulting SparkDataFrame.
 #'
 #' @param path Path of file to read. A vector of multiple paths is allowed.

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java 
b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index ded4420..362bd44 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -126,7 +126,7 @@ public final class JavaHdfsLR {
       .appName("JavaHdfsLR")
       .getOrCreate();
 
-    JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
     JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
     int ITERATIONS = Integer.parseInt(args[1]);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java 
b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 128b5ab..ed0bb87 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -82,7 +82,7 @@ public final class JavaPageRank {
     //     URL         neighbor URL
     //     URL         neighbor URL
     //     ...
-    JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
     // Loads all URLs from input file and initialize their neighbors.
     JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java 
b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 1caee60..8f18604 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -46,7 +46,7 @@ public final class JavaWordCount {
       .appName("JavaWordCount")
       .getOrCreate();
 
-    JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
+    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
     JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
       @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
index 7f568f4..739558e 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java
@@ -87,7 +87,7 @@ public class JavaALSExample {
 
     // $example on$
     JavaRDD<Rating> ratingsRDD = spark
-      .read().text("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
+      .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
       .map(new Function<String, Rating>() {
         public Rating call(String str) {
           return Rating.parseRating(str);

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index 55e591d..e512979 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -59,7 +59,7 @@ public class JavaSparkSQL {
     System.out.println("=== Data source: RDD ===");
     // Load a text file and convert each line to a Java Bean.
     String file = "examples/src/main/resources/people.txt";
-    JavaRDD<Person> people = spark.read().text(file).javaRDD().map(
+    JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map(
       new Function<String, Person>() {
         @Override
         public Person call(String line) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 84f133e..05ac6cb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -72,7 +72,7 @@ object SparkHdfsLR {
       .getOrCreate()
 
     val inputPath = args(0)
-    val lines = spark.read.text(inputPath).rdd
+    val lines = spark.read.textFile(inputPath).rdd
 
     val points = lines.map(parsePoint).cache()
     val ITERATIONS = args(1).toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index aa93c93..fec3160 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -71,7 +71,7 @@ object SparkKMeans {
       .appName("SparkKMeans")
       .getOrCreate()
 
-    val lines = spark.read.text(args(0)).rdd
+    val lines = spark.read.textFile(args(0)).rdd
     val data = lines.map(parseVector _).cache()
     val K = args(1).toInt
     val convergeDist = args(2).toDouble

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
index b7c363c..d0b874c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
@@ -56,7 +56,7 @@ object SparkPageRank {
       .getOrCreate()
 
     val iters = if (args.length > 1) args(1).toInt else 10
-    val lines = spark.read.text(args(0)).rdd
+    val lines = spark.read.textFile(args(0)).rdd
     val links = lines.map{ s =>
       val parts = s.split("\\s+")
       (parts(0), parts(1))

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
index da19ea9..bb5d163 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
@@ -50,7 +50,7 @@ object ALSExample {
     import spark.implicits._
 
     // $example on$
-    val ratings = 
spark.read.text("data/mllib/als/sample_movielens_ratings.txt")
+    val ratings = 
spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
       .map(parseRating)
       .toDF()
     val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
index 781a934..d514891 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala
@@ -33,7 +33,7 @@ object RankingMetricsExample {
     import spark.implicits._
     // $example on$
     // Read in the ratings data
-    val ratings = 
spark.read.text("data/mllib/sample_movielens_data.txt").rdd.map { line =>
+    val ratings = 
spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
       val fields = line.split("::")
       Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
     }.cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index f3182b2..0f50f67 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -291,10 +291,10 @@ class DataFrameReader(object):
     @ignore_unicode_prefix
     @since(1.6)
     def text(self, paths):
-        """Loads a text file and returns a :class:`DataFrame` with a single 
string column named "value".
-        If the directory structure of the text files contains partitioning 
information,
-        those are ignored in the resulting DataFrame. To include partitioning 
information as
-        columns, use ``read.format('text').load(...)``.
+        """
+        Loads text files and returns a :class:`DataFrame` whose schema starts 
with a
+        string column named "value", and followed by partitioned columns if 
there
+        are any.
 
         Each line in the text file is a new row in the resulting DataFrame.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 078b63e..dfe31da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -450,29 +450,46 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   }
 
   /**
+   * Loads text files and returns a [[DataFrame]] whose schema starts with a 
string column named
+   * "value", and followed by partitioned columns if there are any.
+   *
+   * Each line in the text files is a new row in the resulting DataFrame. For 
example:
+   * {{{
+   *   // Scala:
+   *   spark.read.text("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.read().text("/path/to/spark/README.md")
+   * }}}
+   *
+   * @param paths input path
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def text(paths: String*): DataFrame = format("text").load(paths : _*)
+
+  /**
    * Loads text files and returns a [[Dataset]] of String. The underlying 
schema of the Dataset
    * contains a single string column named "value".
    *
    * If the directory structure of the text files contains partitioning 
information, those are
-   * ignored in the resulting Dataset. To include partitioning information as 
columns, use
-   * `read.format("text").load("...")`.
+   * ignored in the resulting Dataset. To include partitioning information as 
columns, use `text`.
    *
    * Each line in the text files is a new element in the resulting Dataset. 
For example:
    * {{{
    *   // Scala:
-   *   spark.read.text("/path/to/spark/README.md")
+   *   spark.read.textFile("/path/to/spark/README.md")
    *
    *   // Java:
-   *   spark.read().text("/path/to/spark/README.md")
+   *   spark.read().textFile("/path/to/spark/README.md")
    * }}}
    *
    * @param paths input path
    * @since 2.0.0
    */
   @scala.annotation.varargs
-  def text(paths: String*): Dataset[String] = {
-    format("text").load(paths : _*).select("value")
-      .as[String](sparkSession.implicits.newStringEncoder)
+  def textFile(paths: String*): Dataset[String] = {
+    text(paths : 
_*).select("value").as[String](sparkSession.implicits.newStringEncoder)
   }
 
   
///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 0152f3f..318b53c 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -338,10 +338,10 @@ public class JavaDataFrameSuite {
 
   @Test
   public void testTextLoad() {
-    Dataset<String> ds1 = spark.read().text(getResource("text-suite.txt"));
+    Dataset<String> ds1 = spark.read().textFile(getResource("text-suite.txt"));
     Assert.assertEquals(4L, ds1.count());
 
-    Dataset<String> ds2 = spark.read().text(
+    Dataset<String> ds2 = spark.read().textFile(
       getResource("text-suite.txt"),
       getResource("text-suite2.txt"));
     Assert.assertEquals(5L, ds2.count());

http://git-wip-us.apache.org/repos/asf/spark/blob/b96e7f6a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 5695f6a..4ed517c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -36,7 +36,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SQLContext.read.text() API") {
-    verifyFrame(spark.read.text(testFile).toDF())
+    verifyFrame(spark.read.text(testFile))
   }
 
   test("SPARK-12562 verify write.text() can handle column name beyond 
`value`") {
@@ -45,7 +45,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
     val tempFile = Utils.createTempDir()
     tempFile.delete()
     df.write.text(tempFile.getCanonicalPath)
-    verifyFrame(spark.read.text(tempFile.getCanonicalPath).toDF())
+    verifyFrame(spark.read.text(tempFile.getCanonicalPath))
 
     Utils.deleteRecursively(tempFile)
   }
@@ -64,20 +64,20 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("reading partitioned data using read.text()") {
+  test("reading partitioned data using read.textFile()") {
     val partitionedData = Thread.currentThread().getContextClassLoader
       .getResource("text-partitioned").toString
-    val df = spark.read.text(partitionedData)
-    val data = df.collect()
+    val ds = spark.read.textFile(partitionedData)
+    val data = ds.collect()
 
-    assert(df.schema == new StructType().add("value", StringType))
+    assert(ds.schema == new StructType().add("value", StringType))
     assert(data.length == 2)
   }
 
-  test("support for partitioned reading") {
+  test("support for partitioned reading using read.text()") {
     val partitionedData = Thread.currentThread().getContextClassLoader
       .getResource("text-partitioned").toString
-    val df = spark.read.format("text").load(partitionedData)
+    val df = spark.read.text(partitionedData)
     val data = df.filter("year = '2015'").select("value").collect()
 
     assert(data(0) == Row("2015-test"))
@@ -94,7 +94,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         testDf.write.option("compression", 
codecName).mode(SaveMode.Overwrite).text(tempDirPath)
         val compressedFiles = new File(tempDirPath).listFiles()
         assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
-        verifyFrame(spark.read.text(tempDirPath).toDF())
+        verifyFrame(spark.read.text(tempDirPath))
     }
 
     val errMsg = intercept[IllegalArgumentException] {
@@ -121,7 +121,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
         .options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
       val compressedFiles = new File(tempDirPath).listFiles()
       assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
-      verifyFrame(spark.read.options(extraOptions).text(tempDirPath).toDF())
+      verifyFrame(spark.read.options(extraOptions).text(tempDirPath))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to