This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7614472 [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils` 7614472 is described below commit 7614472950cb57ffefa0a51dd1163103c5d42df6 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sat Jan 15 09:01:55 2022 -0600 [SPARK-37876][CORE][SQL] Move `SpecificParquetRecordReaderBase.listDirectory` to `TestUtils` ### What changes were proposed in this pull request? `SpecificParquetRecordReaderBase.listDirectory` is used to return the list of files at `path` recursively and the result will skips files that are ignored normally by MapReduce. This method is only used by tests in Spark now and the tests also includes non-parquet test scenario, such as `OrcColumnarBatchReaderSuite`. So this pr move this method from `SpecificParquetRecordReaderBase` to `TestUtils` to make it as a test method. ### Why are the changes needed? Refactoring: move test method to `TestUtils`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GA Closes #35177 from LuciferYang/list-directory. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../src/main/scala/org/apache/spark/TestUtils.scala | 15 +++++++++++++++ .../parquet/SpecificParquetRecordReaderBase.java | 21 --------------------- .../benchmark/DataSourceReadBenchmark.scala | 11 ++++++----- .../orc/OrcColumnarBatchReaderSuite.scala | 4 ++-- .../datasources/parquet/ParquetEncodingSuite.scala | 11 ++++++----- .../datasources/parquet/ParquetIOSuite.scala | 6 +++--- .../execution/datasources/parquet/ParquetTest.scala | 3 ++- .../spark/sql/test/DataFrameReaderWriterSuite.scala | 5 ++--- 8 files changed, 36 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index d2af955..505b3ab 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -446,6 +446,21 @@ private[spark] object TestUtils { current ++ current.filter(_.isDirectory).flatMap(recursiveList) } + /** + * Returns the list of files at 'path' recursively. This skips files that are ignored normally + * by MapReduce. + */ + def listDirectory(path: File): Array[String] = { + val result = ArrayBuffer.empty[String] + if (path.isDirectory) { + path.listFiles.foreach(f => result.appendAll(listDirectory(f))) + } else { + val c = path.getName.charAt(0) + if (c != '.' && c != '_') result.append(path.getAbsolutePath) + } + result.toArray + } + /** Creates a temp JSON file that contains the input JSON record. */ def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String = { val file = File.createTempFile(prefix, ".json", dir) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index e1a0607..07e35c1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -19,10 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -122,25 +120,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo } /** - * Returns the list of files at 'path' recursively. This skips files that are ignored normally - * by MapReduce. - */ - public static List<String> listDirectory(File path) { - List<String> result = new ArrayList<>(); - if (path.isDirectory()) { - for (File f: path.listFiles()) { - result.addAll(listDirectory(f)); - } - } else { - char c = path.getName().charAt(0); - if (c != '.' && c != '_') { - result.add(path.getAbsolutePath()); - } - } - return result; - } - - /** * Initializes the reader to read the file at `path` with `columns` projected. If columns is * null, all the columns are projected. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 31cee48..5094cdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -25,10 +25,11 @@ import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.SparkConf +import org.apache.spark.TestUtils import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader} +import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector @@ -167,7 +168,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { sqlBenchmark.run() // Driving the parquet reader in batch mode directly. - val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray + val files = TestUtils.listDirectory(new File(dir, "parquet")) val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize parquetReaderBenchmark.addCase("ParquetReader Vectorized") { _ => @@ -183,7 +184,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { case DoubleType => (col: ColumnVector, i: Int) => doubleSum += col.getDouble(i) } - files.map(_.asInstanceOf[String]).foreach { p => + files.foreach { p => val reader = new VectorizedParquetRecordReader( enableOffHeapColumnVector, vectorizedReaderBatchSize) try { @@ -468,12 +469,12 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { } } - val files = SpecificParquetRecordReaderBase.listDirectory(new File(dir, "parquet")).toArray + val files = TestUtils.listDirectory(new File(dir, "parquet")) val enableOffHeapColumnVector = spark.sessionState.conf.offHeapColumnVectorEnabled val vectorizedReaderBatchSize = spark.sessionState.conf.parquetVectorizedReaderBatchSize benchmark.addCase("ParquetReader Vectorized") { num => var sum = 0 - files.map(_.asInstanceOf[String]).foreach { p => + files.foreach { p => val reader = new VectorizedParquetRecordReader( enableOffHeapColumnVector, vectorizedReaderBatchSize) try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala index bfcef46..4ff9612 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReaderSuite.scala @@ -25,11 +25,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.orc.TypeDescription +import org.apache.spark.TestUtils import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -117,7 +117,7 @@ class OrcColumnarBatchReaderSuite extends QueryTest with SharedSparkSession { dataTypes.zip(constantValues).foreach { case (dt, v) => val schema = StructType(StructField("col1", IntegerType) :: StructField("pcol", dt) :: Nil) val partitionValues = new GenericInternalRow(Array(v)) - val file = new File(SpecificParquetRecordReaderBase.listDirectory(dir).get(0)) + val file = new File(TestUtils.listDirectory(dir).head) val fileSplit = new FileSplit(new Path(file.getCanonicalPath), 0L, file.length, Array.empty) val taskConf = sqlContext.sessionState.newHadoopConf() val orcFileSchema = TypeDescription.fromString(schema.simpleString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index 746d9c6..f7100a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.TestUtils import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf @@ -50,12 +51,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess (1 :: 1000 :: Nil).foreach { n => { withTempPath { dir => List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head + val file = TestUtils.listDirectory(dir).head val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) - reader.initialize(file.asInstanceOf[String], null) + reader.initialize(file, null) val batch = reader.resultBatch() assert(reader.nextBatch()) assert(batch.numRows() == n) @@ -80,12 +81,12 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess withTempPath { dir => val data = List.fill(n)(NULL_ROW).toDF data.repartition(1).write.parquet(dir.getCanonicalPath) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head + val file = TestUtils.listDirectory(dir).head val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) - reader.initialize(file.asInstanceOf[String], null) + reader.initialize(file, null) val batch = reader.resultBatch() assert(reader.nextBatch()) assert(batch.numRows() == n) @@ -114,7 +115,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess // first page is dictionary encoded and the remaining two are plain encoded. val data = (0 until 512).flatMap(i => Seq.fill(3)(i.toString)) data.toDF("f").coalesce(1).write.parquet(dir.getCanonicalPath) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).asScala.head + val file = TestUtils.listDirectory(dir).head val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 0966319..1e2bb91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -38,7 +38,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -928,7 +928,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString)) withTempPath { dir => spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0); + val file = TestUtils.listDirectory(dir).head; { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( @@ -1032,7 +1032,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val vectorizedReader = new VectorizedParquetRecordReader( conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) val partitionValues = new GenericInternalRow(Array(v)) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) + val file = TestUtils.listDirectory(dir).head try { vectorizedReader.initialize(file, null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 4772316..7a7957c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -33,6 +33,7 @@ import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetM import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.MessageType +import org.apache.spark.TestUtils import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest import org.apache.spark.sql.internal.SQLConf @@ -179,7 +180,7 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { } def getMetaData(dir: java.io.File): Map[String, String] = { - val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) + val file = TestUtils.listDirectory(dir).head val conf = new Configuration() val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf) val parquetReadOptions = HadoopReadOptions.builder(conf).build() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index ea007c1..cb3bd29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -32,7 +32,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, TestUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -42,7 +42,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Ove import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.{DataSourceUtils, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.noop.NoopDataSource -import org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -764,7 +763,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with withTempPath { dir => val path = dir.getAbsolutePath df.write.mode("overwrite").parquet(path) - val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) + val file = TestUtils.listDirectory(dir).head val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), new Configuration()) val f = ParquetFileReader.open(hadoopInputFile) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org