Repository: spark
Updated Branches:
  refs/heads/branch-2.0 8fb125bdf -> cb254ecb1


[SPARK-14343][SQL] Proper column pruning for text data source

## What changes were proposed in this pull request?

Text data source ignores requested schema, and may give wrong result when the 
only data column is not requested. This may happen when only partitioning 
column(s) are requested for a partitioned text table.

## How was this patch tested?

New test case added in `TextSuite`.

Author: Cheng Lian <l...@databricks.com>

Closes #13431 from liancheng/spark-14343-partitioned-text-table.

(cherry picked from commit 1f43562daf9454428796317203d9dcc9030a46eb)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb254ecb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb254ecb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb254ecb

Branch: refs/heads/branch-2.0
Commit: cb254ecb1f351c00f7fe4c3c9cc41c46beda90b4
Parents: 8fb125b
Author: Cheng Lian <l...@databricks.com>
Authored: Wed Jun 1 07:30:55 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jun 1 07:31:52 2016 -0700

----------------------------------------------------------------------
 .../datasources/text/TextFileFormat.scala       | 31 +++++++++++++-------
 .../execution/datasources/text/TextSuite.scala  | 17 +++++++++--
 2 files changed, 35 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb254ecb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 1e5bce4..9c03ab2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -92,20 +92,31 @@ class TextFileFormat extends FileFormat with 
DataSourceRegister {
       filters: Seq[Filter],
       options: Map[String, String],
       hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    assert(
+      requiredSchema.length <= 1,
+      "Text data source only produces a single data column named \"value\".")
+
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
     (file: PartitionedFile) => {
-      val unsafeRow = new UnsafeRow(1)
-      val bufferHolder = new BufferHolder(unsafeRow)
-      val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
-
-      new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { 
line =>
-        // Writes to an UnsafeRow directly
-        bufferHolder.reset()
-        unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
-        unsafeRow.setTotalSize(bufferHolder.totalSize())
-        unsafeRow
+      val reader = new HadoopFileLinesReader(file, 
broadcastedHadoopConf.value.value)
+
+      if (requiredSchema.isEmpty) {
+        val emptyUnsafeRow = new UnsafeRow(0)
+        reader.map(_ => emptyUnsafeRow)
+      } else {
+        val unsafeRow = new UnsafeRow(1)
+        val bufferHolder = new BufferHolder(unsafeRow)
+        val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+        reader.map { line =>
+          // Writes to an UnsafeRow directly
+          bufferHolder.reset()
+          unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+          unsafeRow.setTotalSize(bufferHolder.totalSize())
+          unsafeRow
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cb254ecb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index b5e51e9..7b6981f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.datasources.text
 
 import java.io.File
 
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
@@ -31,6 +28,7 @@ import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.Utils
 
 class TextSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
 
   test("reading text file") {
     verifyFrame(spark.read.format("text").load(testFile))
@@ -126,6 +124,19 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-14343: select partitioning column") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val ds1 = spark.range(1).selectExpr("CONCAT('val_', id)")
+      ds1.write.text(s"$path/part=a")
+      ds1.write.text(s"$path/part=b")
+
+      checkDataset(
+        spark.read.format("text").load(path).select($"part"),
+        Row("a"), Row("b"))
+    }
+  }
+
   private def testFile: String = {
     
Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to