Repository: spark
Updated Branches:
  refs/heads/master 84b23453d -> 32be51fba


[SPARK-15323][SPARK-14463][SQL] Fix reading of partitioned format=text datasets

https://issues.apache.org/jira/browse/SPARK-15323

I was using partitioned text datasets in Spark 1.6.1 but it broke in Spark 
2.0.0.

It would be logical if you could also write those,
but not entirely sure how to solve this with the new DataSet implementation.

Also it doesn't work using `sqlContext.read.text`, since that method returns a 
`DataSet[String]`.
See https://issues.apache.org/jira/browse/SPARK-14463 for that issue.

Author: Jurriaan Pruis <em...@jurriaanpruis.nl>

Closes #13104 from jurriaan/fix-partitioned-text-reads.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32be51fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32be51fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32be51fb

Branch: refs/heads/master
Commit: 32be51fba45f5e07a2a3520293c12dc7765a364d
Parents: 84b2345
Author: Jurriaan Pruis <em...@jurriaanpruis.nl>
Authored: Wed May 18 16:15:09 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed May 18 16:15:09 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  |  3 ++-
 .../datasources/text/DefaultSource.scala        | 14 --------------
 .../text-partitioned/year=2014/data.txt         |  1 +
 .../text-partitioned/year=2015/data.txt         |  1 +
 .../execution/datasources/text/TextSuite.scala  | 20 ++++++++++++++++++++
 5 files changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 011aff4..e33fd83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -457,7 +457,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    */
   @scala.annotation.varargs
   def text(paths: String*): Dataset[String] = {
-    format("text").load(paths : 
_*).as[String](sparkSession.implicits.newStringEncoder)
+    format("text").load(paths : _*).select("value")
+      .as[String](sparkSession.implicits.newStringEncoder)
   }
 
   
///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index f22c024..f091615 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -83,19 +83,6 @@ class DefaultSource extends FileFormat with 
DataSourceRegister {
     }
   }
 
-  override private[sql] def buildReaderWithPartitionValues(
-      sparkSession: SparkSession,
-      dataSchema: StructType,
-      partitionSchema: StructType,
-      requiredSchema: StructType,
-      filters: Seq[Filter],
-      options: Map[String, String],
-      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = 
{
-    // Text data source doesn't support partitioning. Here we simply delegate 
to `buildReader`.
-    buildReader(
-      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, hadoopConf)
-  }
-
   override def buildReader(
       sparkSession: SparkSession,
       dataSchema: StructType,
@@ -152,4 +139,3 @@ class TextOutputWriter(path: String, dataSchema: 
StructType, context: TaskAttemp
     recordWriter.close(context)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/text-partitioned/year=2014/data.txt 
b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
new file mode 100644
index 0000000..e271942
--- /dev/null
+++ b/sql/core/src/test/resources/text-partitioned/year=2014/data.txt
@@ -0,0 +1 @@
+2014-test

http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/text-partitioned/year=2015/data.txt 
b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
new file mode 100644
index 0000000..b8c03da
--- /dev/null
+++ b/sql/core/src/test/resources/text-partitioned/year=2015/data.txt
@@ -0,0 +1 @@
+2015-test

http://git-wip-us.apache.org/repos/asf/spark/blob/32be51fb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index f61fce5..b5e51e9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -65,6 +65,26 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("reading partitioned data using read.text()") {
+    val partitionedData = Thread.currentThread().getContextClassLoader
+      .getResource("text-partitioned").toString
+    val df = spark.read.text(partitionedData)
+    val data = df.collect()
+
+    assert(df.schema == new StructType().add("value", StringType))
+    assert(data.length == 2)
+  }
+
+  test("support for partitioned reading") {
+    val partitionedData = Thread.currentThread().getContextClassLoader
+      .getResource("text-partitioned").toString
+    val df = spark.read.format("text").load(partitionedData)
+    val data = df.filter("year = '2015'").select("value").collect()
+
+    assert(data(0) == Row("2015-test"))
+    assert(data.length == 1)
+  }
+
   test("SPARK-13503 Support to specify the option for compression codec for 
TEXT") {
     val testDf = spark.read.text(testFile)
     val extensionNameMap = Map("bzip2" -> ".bz2", "deflate" -> ".deflate", 
"gzip" -> ".gz")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to