Repository: spark
Updated Branches:
  refs/heads/master 6e02aec44 -> 33814f887


[SPARK-15307][SQL] speed up listing files for data source

## What changes were proposed in this pull request?

Currently, listing files is very slow if there is thousands files, especially 
on local file system, because:
1) FileStatus.getPermission() is very slow on local file system, which is 
launch a subprocess and parse the stdout.
2) Create an JobConf is very expensive (ClassUtil.findContainingJar() is slow).

This PR improve these by:
1) Use another constructor of LocatedFileStatus to avoid calling 
FileStatus.getPermission, the permissions are not used for data sources.
2) Only create an JobConf once within one task.

## How was this patch tested?

Manually tests on a partitioned table with 1828 partitions, decrease the time 
to load the table from 22 seconds to 1.6 seconds (Most of time are spent in 
merging schema now).

Author: Davies Liu <dav...@databricks.com>

Closes #13094 from davies/listing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33814f88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33814f88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33814f88

Branch: refs/heads/master
Commit: 33814f887aea339c99e14ce7f14ca6fcc6875015
Parents: 6e02aec
Author: Davies Liu <dav...@databricks.com>
Authored: Wed May 18 18:46:57 2016 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed May 18 18:46:57 2016 +0800

----------------------------------------------------------------------
 .../datasources/ListingFileCatalog.scala        |  9 ++---
 .../datasources/fileSourceInterfaces.scala      | 38 ++++++++++++++------
 2 files changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33814f88/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 5cee2b9..644e5d6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -77,12 +77,12 @@ class ListingFileCatalog(
     if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
       HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, 
sparkSession.sparkContext)
     } else {
+      // Dummy jobconf to get to the pathFilter defined in configuration
+      val jobConf = new JobConf(hadoopConf, this.getClass)
+      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
       val statuses: Seq[FileStatus] = paths.flatMap { path =>
         val fs = path.getFileSystem(hadoopConf)
         logInfo(s"Listing $path on driver")
-        // Dummy jobconf to get to the pathFilter defined in configuration
-        val jobConf = new JobConf(hadoopConf, this.getClass)
-        val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
 
         val statuses = {
           val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
@@ -101,7 +101,8 @@ class ListingFileCatalog(
           // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
           //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
           //   exceeds threshold.
-          case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, 
f.getLen))
+          case f =>
+            HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
         }
       }.filterNot { status =>
         val name = status.getPath.getName

http://git-wip-us.apache.org/repos/asf/spark/blob/33814f88/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index b516297..8d332df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -348,28 +348,40 @@ private[sql] object HadoopFsRelation extends Logging {
     pathName == "_SUCCESS" || pathName == "_temporary" || 
pathName.startsWith(".")
   }
 
+  /**
+   * Create a LocatedFileStatus using FileStatus and block locations.
+   */
+  def createLocatedFileStatus(f: FileStatus, locations: Array[BlockLocation]): 
LocatedFileStatus = {
+    // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(), which is
+    // very slow on some file system (RawLocalFileSystem, which is launch a 
subprocess and parse the
+    // stdout).
+    val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, 
f.getBlockSize,
+      f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
+    if (f.isSymlink) {
+      lfs.setSymlink(f.getSymlink)
+    }
+    lfs
+  }
+
   // We don't filter files/directories whose name start with "_" except 
"_temporary" here, as
   // specific data sources may take advantages over them (e.g. Parquet 
_metadata and
   // _common_metadata files). "_temporary" directories are explicitly ignored 
since failed
   // tasks/jobs may leave partial/corrupted data files there.  Files and 
directories whose name
   // start with "." are also ignored.
-  def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
+  def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): 
Array[FileStatus] = {
     logInfo(s"Listing ${status.getPath}")
     val name = status.getPath.getName.toLowerCase
     if (shouldFilterOut(name)) {
       Array.empty
     } else {
-      // Dummy jobconf to get to the pathFilter defined in configuration
-      val jobConf = new JobConf(fs.getConf, this.getClass())
-      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
       val statuses = {
         val (dirs, files) = 
fs.listStatus(status.getPath).partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
-        if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
+        val stats = files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, 
filter))
+        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else 
stats
       }
       statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
         case f: LocatedFileStatus => f
-        case f => new LocatedFileStatus(f, fs.getFileBlockLocations(f, 0, 
f.getLen))
+        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, 
f.getLen))
       }
     }
   }
@@ -403,9 +415,15 @@ private[sql] object HadoopFsRelation extends Logging {
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
 
-    val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new 
Path(_)).flatMap { path =>
-      val fs = path.getFileSystem(serializableConfiguration.value)
-      Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
+    val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions 
{ paths =>
+      // Dummy jobconf to get to the pathFilter defined in configuration
+      // It's very expensive to create a JobConf(ClassUtil.findContainingJar() 
is slow)
+      val jobConf = new JobConf(serializableConfiguration.value, this.getClass)
+      val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+      paths.map(new Path(_)).flatMap { path =>
+        val fs = path.getFileSystem(serializableConfiguration.value)
+        Try(listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).getOrElse(Array.empty)
+      }
     }.map { status =>
       val blockLocations = status match {
         case f: LocatedFileStatus =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to