This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b276788  [SPARK-27990][SQL][ML] Provide a way to recursively load data 
from datasource
b276788 is described below

commit b276788d57b270d455ef6a7c5ed6cf8a74885dde
Author: WeichenXu <weichen...@databricks.com>
AuthorDate: Thu Jun 20 12:43:01 2019 +0800

    [SPARK-27990][SQL][ML] Provide a way to recursively load data from 
datasource
    
    ## What changes were proposed in this pull request?
    
    Provide a way to recursively load data from datasource.
    I add a "recursiveFileLookup" option.
    
    When "recursiveFileLookup" option turn on, then partition inferring is 
turned off and all files from the directory will be loaded recursively.
    
    If some datasource explicitly specify the partitionSpec, then if user turn 
on "recursive" option, then exception will be thrown.
    
    ## How was this patch tested?
    
    Unit tests.
    
    Please review https://spark.apache.org/contributing.html before opening a 
pull request.
    
    Closes #24830 from WeichenXu123/recursive_ds.
    
    Authored-by: WeichenXu <weichen...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../ml/source/image/ImageFileFormatSuite.scala     |  1 +
 .../datasources/PartitioningAwareFileIndex.scala   | 48 +++++++++------
 .../spark/sql/FileBasedDataSourceSuite.scala       | 70 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 18 deletions(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
index 38e2513..38bb246 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
@@ -30,6 +30,7 @@ class ImageFileFormatSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
   // Single column of images named "image"
   private lazy val imagePath = "../data/mllib/images/partitioned"
+  private lazy val recursiveImagePath = "../data/mllib/images"
 
   test("image datasource count test") {
     val df1 = spark.read.format("image").load(imagePath)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 3c93255..3adec2f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -62,6 +62,10 @@ abstract class PartitioningAwareFileIndex(
     pathGlobFilter.forall(_.accept(file.getPath))
   }
 
+  protected lazy val recursiveFileLookup = {
+    parameters.getOrElse("recursiveFileLookup", "false").toBoolean
+  }
+
   override def listFiles(
       partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
     def isNonEmptyFile(f: FileStatus): Boolean = {
@@ -70,6 +74,10 @@ abstract class PartitioningAwareFileIndex(
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
       PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) 
:: Nil
     } else {
+      if (recursiveFileLookup) {
+        throw new IllegalArgumentException(
+          "Datasource with partition do not allow recursive file loading.")
+      }
       prunePartitions(partitionFilters, partitionSpec()).map {
         case PartitionPath(values, path) =>
           val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
@@ -95,7 +103,7 @@ abstract class PartitioningAwareFileIndex(
   override def sizeInBytes: Long = allFiles().map(_.getLen).sum
 
   def allFiles(): Seq[FileStatus] = {
-    val files = if (partitionSpec().partitionColumns.isEmpty) {
+    val files = if (partitionSpec().partitionColumns.isEmpty && 
!recursiveFileLookup) {
       // For each of the root input paths, get the list of files inside them
       rootPaths.flatMap { path =>
         // Make the path qualified (consistent with listLeafFiles and 
bulkListLeafFiles).
@@ -128,23 +136,27 @@ abstract class PartitioningAwareFileIndex(
   }
 
   protected def inferPartitioning(): PartitionSpec = {
-    // We use leaf dirs containing data files to discover the schema.
-    val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
-      files.exists(f => isDataPath(f.getPath))
-    }.keys.toSeq
-
-    val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
-    val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
-      .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-    PartitioningUtils.parsePartitions(
-      leafDirs,
-      typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
-      basePaths = basePaths,
-      userSpecifiedSchema = userSpecifiedSchema,
-      caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
-      validatePartitionColumns = 
sparkSession.sqlContext.conf.validatePartitionColumns,
-      timeZoneId = timeZoneId)
+    if (recursiveFileLookup) {
+      PartitionSpec.emptySpec
+    } else {
+      // We use leaf dirs containing data files to discover the schema.
+      val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
+        files.exists(f => isDataPath(f.getPath))
+      }.keys.toSeq
+
+      val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
+      val timeZoneId = 
caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
+        .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
+
+      PartitioningUtils.parsePartitions(
+        leafDirs,
+        typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+        basePaths = basePaths,
+        userSpecifiedSchema = userSpecifiedSchema,
+        caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
+        validatePartitionColumns = 
sparkSession.sqlContext.conf.validatePartitionColumns,
+        timeZoneId = timeZoneId)
+    }
   }
 
   private def prunePartitions(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index ea4794e..b2d6f01 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.io.{File, FilenameFilter, FileNotFoundException}
+import java.nio.file.{Files, StandardOpenOption}
 import java.util.Locale
 
 import scala.collection.mutable
@@ -572,6 +573,75 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
     }
   }
 
+  test("Option recursiveFileLookup: recursive loading correctly") {
+
+    val expectedFileList = mutable.ListBuffer[String]()
+
+    def createFile(dir: File, fileName: String, format: String): Unit = {
+      val path = new File(dir, s"${fileName}.${format}")
+      Files.write(
+        path.toPath,
+        s"content of ${path.toString}".getBytes,
+        StandardOpenOption.CREATE, StandardOpenOption.WRITE
+      )
+      val fsPath = new Path(path.getAbsoluteFile.toURI).toString
+      expectedFileList.append(fsPath)
+    }
+
+    def createDir(path: File, dirName: String, level: Int): Unit = {
+      val dir = new File(path, s"dir${dirName}-${level}")
+      dir.mkdir()
+      createFile(dir, s"file${level}", "bin")
+      createFile(dir, s"file${level}", "text")
+
+      if (level < 4) {
+        // create sub-dir
+        createDir(dir, "sub0", level + 1)
+        createDir(dir, "sub1", level + 1)
+      }
+    }
+
+    withTempPath { path =>
+      path.mkdir()
+      createDir(path, "root", 0)
+
+      val dataPath = new File(path, "dirroot-0").getAbsolutePath
+      val fileList = spark.read.format("binaryFile")
+        .option("recursiveFileLookup", true)
+        .load(dataPath)
+        .select("path").collect().map(_.getString(0))
+
+      assert(fileList.toSet === expectedFileList.toSet)
+
+      val fileList2 = spark.read.format("binaryFile")
+        .option("recursiveFileLookup", true)
+        .option("pathGlobFilter", "*.bin")
+        .load(dataPath)
+        .select("path").collect().map(_.getString(0))
+
+      assert(fileList2.toSet === 
expectedFileList.filter(_.endsWith(".bin")).toSet)
+    }
+  }
+
+  test("Option recursiveFileLookup: disable partition inferring") {
+    val dataPath = Thread.currentThread().getContextClassLoader
+      .getResource("test-data/text-partitioned").toString
+
+    val df = spark.read.format("binaryFile")
+      .option("recursiveFileLookup", true)
+      .load(dataPath)
+
+    assert(!df.columns.contains("year"), "Expect partition inferring disabled")
+    val fileList = df.select("path").collect().map(_.getString(0))
+
+    val expectedFileList = Array(
+      dataPath + "/year=2014/data.txt",
+      dataPath + "/year=2015/data.txt"
+    ).map(path => new Path(path).toString)
+
+    assert(fileList.toSet === expectedFileList.toSet)
+  }
+
   test("Return correct results when data columns overlap with partition 
columns") {
     Seq("parquet", "orc", "json").foreach { format =>
       withTempPath { path =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to