This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new facf9c3  [SPARK-28204][SQL][TESTS] Make separate two test cases for 
column pruning in binary files
facf9c3 is described below

commit facf9c30a283ec682b5adb2e7afdbf5d011e3808
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Sat Jun 29 14:05:23 2019 +0900

    [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning 
in binary files
    
    ## What changes were proposed in this pull request?
    
    SPARK-27534 missed to address my own comments at 
https://github.com/WeichenXu123/spark/pull/8
    It's better to push this in since the codes are already cleaned up.
    
    ## How was this patch tested?
    
    Unittests fixed
    
    Closes #25003 from HyukjinKwon/SPARK-27534.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../binaryfile/BinaryFileFormatSuite.scala         | 88 +++++++++++-----------
 1 file changed, 43 insertions(+), 45 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 9e2969b..a66b34f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -290,56 +290,54 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSQLContext with SQLTest
     ), true)
   }
 
+  private def readBinaryFile(file: File, requiredSchema: StructType): Row = {
+    val format = new BinaryFileFormat
+    val reader = format.buildReaderWithPartitionValues(
+      sparkSession = spark,
+      dataSchema = schema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = requiredSchema,
+      filters = Seq.empty,
+      options = Map.empty,
+      hadoopConf = spark.sessionState.newHadoopConf()
+    )
+    val partitionedFile = mock(classOf[PartitionedFile])
+    when(partitionedFile.filePath).thenReturn(file.getPath)
+    val encoder = RowEncoder(requiredSchema).resolveAndBind()
+    encoder.fromRow(reader(partitionedFile).next())
+  }
+
   test("column pruning") {
-    def getRequiredSchema(fieldNames: String*): StructType = {
-      StructType(fieldNames.map {
-        case f if schema.fieldNames.contains(f) => schema(f)
-        case other => StructField(other, NullType)
-      })
-    }
-    def read(file: File, requiredSchema: StructType): Row = {
-      val format = new BinaryFileFormat
-      val reader = format.buildReaderWithPartitionValues(
-        sparkSession = spark,
-        dataSchema = schema,
-        partitionSchema = StructType(Nil),
-        requiredSchema = requiredSchema,
-        filters = Seq.empty,
-        options = Map.empty,
-        hadoopConf = spark.sessionState.newHadoopConf()
-      )
-      val partitionedFile = mock(classOf[PartitionedFile])
-      when(partitionedFile.filePath).thenReturn(file.getPath)
-      val encoder = RowEncoder(requiredSchema).resolveAndBind()
-      encoder.fromRow(reader(partitionedFile).next())
-    }
-    val file = new File(Utils.createTempDir(), "data")
-    val content = "123".getBytes
-    Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
-
-    read(file, getRequiredSchema(MODIFICATION_TIME, CONTENT, LENGTH, PATH)) 
match {
-      case Row(t, c, len, p) =>
-        assert(t === new Timestamp(file.lastModified()))
-        assert(c === content)
-        assert(len === content.length)
-        assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath))
+    withTempPath { file =>
+      val content = "123".getBytes
+      Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
+
+      val actual = readBinaryFile(file, StructType(schema.takeRight(3)))
+      val expected = Row(new Timestamp(file.lastModified()), content.length, 
content)
+
+      assert(actual === expected)
     }
-    file.setReadable(false)
-    withClue("cannot read content") {
+  }
+
+  test("column pruning - non-readable file") {
+    withTempPath { file =>
+      val content = "abc".getBytes
+      Files.write(file.toPath, content, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
+      file.setReadable(false)
+
+      // If content is selected, it throws an exception because it's not 
readable.
       intercept[IOException] {
-        read(file, getRequiredSchema(CONTENT))
+        readBinaryFile(file, StructType(schema(CONTENT) :: Nil))
       }
-    }
-    assert(read(file, getRequiredSchema(LENGTH)) === Row(content.length),
-      "Get length should not read content.")
-    intercept[RuntimeException] {
-      read(file, getRequiredSchema(LENGTH, "other"))
-    }
 
-    val df = spark.read.format(BINARY_FILE).load(file.getPath)
-    assert(df.count() === 1, "Count should not read content.")
-    assert(df.select("LENGTH").first().getLong(0) === content.length,
-      "column pruning should be case insensitive")
+      // Otherwise, it should be able to read.
+      assert(
+        readBinaryFile(file, StructType(schema(LENGTH) :: Nil)) === 
Row(content.length),
+        "Get length should not read content.")
+      assert(
+        spark.read.format(BINARY_FILE).load(file.getPath).count() === 1,
+        "Count should not read content.")
+    }
   }
 
   test("fail fast and do not attempt to read if a file is too big") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to