This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b276788 [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource b276788 is described below commit b276788d57b270d455ef6a7c5ed6cf8a74885dde Author: WeichenXu <weichen...@databricks.com> AuthorDate: Thu Jun 20 12:43:01 2019 +0800 [SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource ## What changes were proposed in this pull request? Provide a way to recursively load data from datasource. I add a "recursiveFileLookup" option. When "recursiveFileLookup" option turn on, then partition inferring is turned off and all files from the directory will be loaded recursively. If some datasource explicitly specify the partitionSpec, then if user turn on "recursive" option, then exception will be thrown. ## How was this patch tested? Unit tests. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #24830 from WeichenXu123/recursive_ds. Authored-by: WeichenXu <weichen...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../ml/source/image/ImageFileFormatSuite.scala | 1 + .../datasources/PartitioningAwareFileIndex.scala | 48 +++++++++------ .../spark/sql/FileBasedDataSourceSuite.scala | 70 ++++++++++++++++++++++ 3 files changed, 101 insertions(+), 18 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala index 38e2513..38bb246 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala @@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { // Single column of images named "image" private lazy val imagePath = "../data/mllib/images/partitioned" + private lazy val recursiveImagePath = "../data/mllib/images" test("image datasource count test") { val df1 = spark.read.format("image").load(imagePath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 3c93255..3adec2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex( pathGlobFilter.forall(_.accept(file.getPath)) } + protected lazy val recursiveFileLookup = { + parameters.getOrElse("recursiveFileLookup", "false").toBoolean + } + override def listFiles( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { def isNonEmptyFile(f: FileStatus): Boolean = { @@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex( val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil } else { + if (recursiveFileLookup) { + throw new IllegalArgumentException( + "Datasource with partition do not allow recursive file loading.") + } prunePartitions(partitionFilters, partitionSpec()).map { case PartitionPath(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { @@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex( override def sizeInBytes: Long = allFiles().map(_.getLen).sum def allFiles(): Seq[FileStatus] = { - val files = if (partitionSpec().partitionColumns.isEmpty) { + val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) { // For each of the root input paths, get the list of files inside them rootPaths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles). @@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex( } protected def inferPartitioning(): PartitionSpec = { - // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => - files.exists(f => isDataPath(f.getPath)) - }.keys.toSeq - - val caseInsensitiveOptions = CaseInsensitiveMap(parameters) - val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) - - PartitioningUtils.parsePartitions( - leafDirs, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, - basePaths = basePaths, - userSpecifiedSchema = userSpecifiedSchema, - caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis, - validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns, - timeZoneId = timeZoneId) + if (recursiveFileLookup) { + PartitionSpec.emptySpec + } else { + // We use leaf dirs containing data files to discover the schema. + val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => + files.exists(f => isDataPath(f.getPath)) + }.keys.toSeq + + val caseInsensitiveOptions = CaseInsensitiveMap(parameters) + val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) + .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) + + PartitioningUtils.parsePartitions( + leafDirs, + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, + basePaths = basePaths, + userSpecifiedSchema = userSpecifiedSchema, + caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis, + validatePartitionColumns = sparkSession.sqlContext.conf.validatePartitionColumns, + timeZoneId = timeZoneId) + } } private def prunePartitions( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index ea4794e..b2d6f01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.{File, FilenameFilter, FileNotFoundException} +import java.nio.file.{Files, StandardOpenOption} import java.util.Locale import scala.collection.mutable @@ -572,6 +573,75 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } + test("Option recursiveFileLookup: recursive loading correctly") { + + val expectedFileList = mutable.ListBuffer[String]() + + def createFile(dir: File, fileName: String, format: String): Unit = { + val path = new File(dir, s"${fileName}.${format}") + Files.write( + path.toPath, + s"content of ${path.toString}".getBytes, + StandardOpenOption.CREATE, StandardOpenOption.WRITE + ) + val fsPath = new Path(path.getAbsoluteFile.toURI).toString + expectedFileList.append(fsPath) + } + + def createDir(path: File, dirName: String, level: Int): Unit = { + val dir = new File(path, s"dir${dirName}-${level}") + dir.mkdir() + createFile(dir, s"file${level}", "bin") + createFile(dir, s"file${level}", "text") + + if (level < 4) { + // create sub-dir + createDir(dir, "sub0", level + 1) + createDir(dir, "sub1", level + 1) + } + } + + withTempPath { path => + path.mkdir() + createDir(path, "root", 0) + + val dataPath = new File(path, "dirroot-0").getAbsolutePath + val fileList = spark.read.format("binaryFile") + .option("recursiveFileLookup", true) + .load(dataPath) + .select("path").collect().map(_.getString(0)) + + assert(fileList.toSet === expectedFileList.toSet) + + val fileList2 = spark.read.format("binaryFile") + .option("recursiveFileLookup", true) + .option("pathGlobFilter", "*.bin") + .load(dataPath) + .select("path").collect().map(_.getString(0)) + + assert(fileList2.toSet === expectedFileList.filter(_.endsWith(".bin")).toSet) + } + } + + test("Option recursiveFileLookup: disable partition inferring") { + val dataPath = Thread.currentThread().getContextClassLoader + .getResource("test-data/text-partitioned").toString + + val df = spark.read.format("binaryFile") + .option("recursiveFileLookup", true) + .load(dataPath) + + assert(!df.columns.contains("year"), "Expect partition inferring disabled") + val fileList = df.select("path").collect().map(_.getString(0)) + + val expectedFileList = Array( + dataPath + "/year=2014/data.txt", + dataPath + "/year=2015/data.txt" + ).map(path => new Path(path).toString) + + assert(fileList.toSet === expectedFileList.toSet) + } + test("Return correct results when data columns overlap with partition columns") { Seq("parquet", "orc", "json").foreach { format => withTempPath { path => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org