Repository: spark
Updated Branches:
  refs/heads/branch-2.0 838143a2a -> 60bd704b5


[SPARK-16121] ListingFileCatalog does not list in parallel anymore

## What changes were proposed in this pull request?
Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This 
PR fixes the problem

## How was this patch tested?
Tested manually. (This PR also adds a proper test for SPARK-14959)

Author: Yin Huai <yh...@databricks.com>

Closes #13830 from yhuai/SPARK-16121.

(cherry picked from commit 39ad53f7ffddae5ba0ff0a76089ba671b14c44c8)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60bd704b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60bd704b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60bd704b

Branch: refs/heads/branch-2.0
Commit: 60bd704b541c4d1991922ffd3dd5b47de9bd5821
Parents: 838143a
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Jun 22 18:07:07 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jun 22 18:07:27 2016 +0800

----------------------------------------------------------------------
 .../datasources/ListingFileCatalog.scala        | 58 ++++++++++++++++++--
 .../datasources/fileSourceInterfaces.scala      |  7 ++-
 .../datasources/FileSourceStrategySuite.scala   | 45 ++++++++++++++-
 3 files changed, 101 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/60bd704b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index f713fde..675e755 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import scala.collection.mutable
 import scala.util.Try
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.SparkSession
@@ -73,21 +73,67 @@ class ListingFileCatalog(
     cachedPartitionSpec = null
   }
 
-  protected def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
+  /**
+   * List leaf files of given paths. This method will submit a Spark job to do 
parallel
+   * listing whenever there is a path having more files than the parallel 
partition discovery
+   * discovery threshold.
+   */
+  protected[spark] def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
     if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
       HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
     } else {
+      // Right now, the number of paths is less than the value of
+      // parallelPartitionDiscoveryThreshold. So, we will list file statues at 
the driver.
+      // If there is any child that has more files than the threshold, we will 
use parallel
+      // listing.
+
       // Dummy jobconf to get to the pathFilter defined in configuration
       val jobConf = new JobConf(hadoopConf, this.getClass)
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
       val statuses: Seq[FileStatus] = paths.flatMap { path =>
         val fs = path.getFileSystem(hadoopConf)
         logTrace(s"Listing $path on driver")
-        Try {
-          HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)
-        }.getOrElse(Array.empty[FileStatus])
+
+        val childStatuses = {
+          // TODO: We need to avoid of using Try at here.
+          val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
+          if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
+        }
+
+        childStatuses.map {
+          case f: LocatedFileStatus => f
+
+          // NOTE:
+          //
+          // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
+          //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
+          //   implementations don't actually issue RPC for this method.
+          //
+          // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
+          //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
+          //   paths exceeds threshold.
+          case f =>
+            if (f.isDirectory ) {
+              // If f is a directory, we do not need to call 
getFileBlockLocations (SPARK-14959).
+              f
+            } else {
+              HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
+            }
+        }
+      }.filterNot { status =>
+        val name = status.getPath.getName
+        HadoopFsRelation.shouldFilterOut(name)
+      }
+
+      val (dirs, files) = statuses.partition(_.isDirectory)
+
+      // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
+      if (dirs.isEmpty) {
+        mutable.LinkedHashSet(files: _*)
+      } else {
+        mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
       }
-      mutable.LinkedHashSet(statuses: _*)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/60bd704b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 5689cf6..20399e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -391,13 +391,14 @@ private[sql] object HadoopFsRelation extends Logging {
     logTrace(s"Listing ${status.getPath}")
     val name = status.getPath.getName.toLowerCase
     if (shouldFilterOut(name)) {
-      Array.empty
+      Array.empty[FileStatus]
     } else {
       val statuses = {
         val (dirs, files) = 
fs.listStatus(status.getPath).partition(_.isDirectory)
         val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, 
filter))
         if (filter != null) stats.filter(f => filter.accept(f.getPath)) else 
stats
       }
+      // statuses do not have any dirs.
       statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
         case f: LocatedFileStatus => f
 
@@ -459,7 +460,9 @@ private[sql] object HadoopFsRelation extends Logging {
       val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
       paths.map(new Path(_)).flatMap { path =>
         val fs = path.getFileSystem(serializableConfiguration.value)
-        Try(listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).getOrElse(Array.empty)
+        // TODO: We need to avoid of using Try at here.
+        Try(listLeafFiles(fs, fs.getFileStatus(path), pathFilter))
+          .getOrElse(Array.empty[FileStatus])
       }
     }.map { status =>
       val blockLocations = status match {

http://git-wip-us.apache.org/repos/asf/spark/blob/60bd704b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 67ff257..8d8a18f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem}
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, 
RawLocalFileSystem}
 import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.SparkConf
@@ -375,6 +375,38 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
     }
   }
 
+  test("SPARK-14959: Do not call getFileBlockLocations on directories") {
+    // Setting PARALLEL_PARTITION_DISCOVERY_THRESHOLD to 2. So we will first
+    // list file statues at driver side and then for the level of p2, we will 
list
+    // file statues in parallel.
+    withSQLConf(
+      "fs.file.impl" -> classOf[MockDistributedFileSystem].getName,
+      SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2") {
+      withTempPath { path =>
+        val tempDir = path.getCanonicalPath
+
+        Seq("p1=1/p2=2/p3=3/file1", "p1=1/p2=3/p3=3/file1").foreach { fileName 
=>
+          val file = new File(tempDir, fileName)
+          assert(file.getParentFile.exists() || file.getParentFile.mkdirs())
+          util.stringToFile(file, fileName)
+        }
+
+        val fileCatalog = new ListingFileCatalog(
+          sparkSession = spark,
+          paths = Seq(new Path(tempDir)),
+          parameters = Map.empty[String, String],
+          partitionSchema = None)
+        // This should not fail.
+        fileCatalog.listLeafFiles(Seq(new Path(tempDir)))
+
+        // Also have an integration test.
+        checkAnswer(
+          spark.read.text(tempDir).select("p1", "p2", "p3", "value"),
+          Row(1, 2, 3, "p1=1/p2=2/p3=3/file1") :: Row(1, 3, 3, 
"p1=1/p2=3/p3=3/file1") :: Nil)
+      }
+    }
+  }
+
   // Helpers for checking the arguments passed to the FileFormat.
 
   protected val checkPartitionSchema =
@@ -530,3 +562,14 @@ class LocalityTestFileSystem extends RawLocalFileSystem {
     Array(new BlockLocation(Array(s"host$count:50010"), Array(s"host$count"), 
0, len))
   }
 }
+
+// This file system is for SPARK-14959 (DistributedFileSystem will throw an 
exception
+// if we call getFileBlockLocations on a dir).
+class MockDistributedFileSystem extends RawLocalFileSystem {
+
+  override def getFileBlockLocations(
+      file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+    require(!file.isDirectory, "The file path can not be a directory.")
+    super.getFileBlockLocations(file, start, len)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to