This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new facf9c3 [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files facf9c3 is described below commit facf9c30a283ec682b5adb2e7afdbf5d011e3808 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Sat Jun 29 14:05:23 2019 +0900 [SPARK-28204][SQL][TESTS] Make separate two test cases for column pruning in binary files ## What changes were proposed in this pull request? SPARK-27534 missed to address my own comments at https://github.com/WeichenXu123/spark/pull/8 It's better to push this in since the codes are already cleaned up. ## How was this patch tested? Unittests fixed Closes #25003 from HyukjinKwon/SPARK-27534. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../binaryfile/BinaryFileFormatSuite.scala | 88 +++++++++++----------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 9e2969b..a66b34f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -290,56 +290,54 @@ class BinaryFileFormatSuite extends QueryTest with SharedSQLContext with SQLTest ), true) } + private def readBinaryFile(file: File, requiredSchema: StructType): Row = { + val format = new BinaryFileFormat + val reader = format.buildReaderWithPartitionValues( + sparkSession = spark, + dataSchema = schema, + partitionSchema = StructType(Nil), + requiredSchema = requiredSchema, + filters = Seq.empty, + options = Map.empty, + hadoopConf = spark.sessionState.newHadoopConf() + ) + val partitionedFile = mock(classOf[PartitionedFile]) + when(partitionedFile.filePath).thenReturn(file.getPath) + val encoder = RowEncoder(requiredSchema).resolveAndBind() + encoder.fromRow(reader(partitionedFile).next()) + } + test("column pruning") { - def getRequiredSchema(fieldNames: String*): StructType = { - StructType(fieldNames.map { - case f if schema.fieldNames.contains(f) => schema(f) - case other => StructField(other, NullType) - }) - } - def read(file: File, requiredSchema: StructType): Row = { - val format = new BinaryFileFormat - val reader = format.buildReaderWithPartitionValues( - sparkSession = spark, - dataSchema = schema, - partitionSchema = StructType(Nil), - requiredSchema = requiredSchema, - filters = Seq.empty, - options = Map.empty, - hadoopConf = spark.sessionState.newHadoopConf() - ) - val partitionedFile = mock(classOf[PartitionedFile]) - when(partitionedFile.filePath).thenReturn(file.getPath) - val encoder = RowEncoder(requiredSchema).resolveAndBind() - encoder.fromRow(reader(partitionedFile).next()) - } - val file = new File(Utils.createTempDir(), "data") - val content = "123".getBytes - Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) - - read(file, getRequiredSchema(MODIFICATION_TIME, CONTENT, LENGTH, PATH)) match { - case Row(t, c, len, p) => - assert(t === new Timestamp(file.lastModified())) - assert(c === content) - assert(len === content.length) - assert(p.asInstanceOf[String].endsWith(file.getAbsolutePath)) + withTempPath { file => + val content = "123".getBytes + Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) + + val actual = readBinaryFile(file, StructType(schema.takeRight(3))) + val expected = Row(new Timestamp(file.lastModified()), content.length, content) + + assert(actual === expected) } - file.setReadable(false) - withClue("cannot read content") { + } + + test("column pruning - non-readable file") { + withTempPath { file => + val content = "abc".getBytes + Files.write(file.toPath, content, StandardOpenOption.CREATE, StandardOpenOption.WRITE) + file.setReadable(false) + + // If content is selected, it throws an exception because it's not readable. intercept[IOException] { - read(file, getRequiredSchema(CONTENT)) + readBinaryFile(file, StructType(schema(CONTENT) :: Nil)) } - } - assert(read(file, getRequiredSchema(LENGTH)) === Row(content.length), - "Get length should not read content.") - intercept[RuntimeException] { - read(file, getRequiredSchema(LENGTH, "other")) - } - val df = spark.read.format(BINARY_FILE).load(file.getPath) - assert(df.count() === 1, "Count should not read content.") - assert(df.select("LENGTH").first().getLong(0) === content.length, - "column pruning should be case insensitive") + // Otherwise, it should be able to read. + assert( + readBinaryFile(file, StructType(schema(LENGTH) :: Nil)) === Row(content.length), + "Get length should not read content.") + assert( + spark.read.format(BINARY_FILE).load(file.getPath).count() === 1, + "Count should not read content.") + } } test("fail fast and do not attempt to read if a file is too big") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org