spark git commit: [SPARK-16121] ListingFileCatalog does not list in parallel anymore
Repository: spark Updated Branches: refs/heads/branch-2.0 838143a2a -> 60bd704b5 [SPARK-16121] ListingFileCatalog does not list in parallel anymore ## What changes were proposed in this pull request? Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This PR fixes the problem ## How was this patch tested? Tested manually. (This PR also adds a proper test for SPARK-14959) Author: Yin HuaiCloses #13830 from yhuai/SPARK-16121. (cherry picked from commit 39ad53f7ffddae5ba0ff0a76089ba671b14c44c8) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60bd704b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60bd704b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60bd704b Branch: refs/heads/branch-2.0 Commit: 60bd704b541c4d1991922ffd3dd5b47de9bd5821 Parents: 838143a Author: Yin Huai Authored: Wed Jun 22 18:07:07 2016 +0800 Committer: Cheng Lian Committed: Wed Jun 22 18:07:27 2016 +0800 -- .../datasources/ListingFileCatalog.scala| 58 ++-- .../datasources/fileSourceInterfaces.scala | 7 ++- .../datasources/FileSourceStrategySuite.scala | 45 ++- 3 files changed, 101 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/60bd704b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index f713fde..675e755 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import scala.util.Try -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.SparkSession @@ -73,21 +73,67 @@ class ListingFileCatalog( cachedPartitionSpec = null } - protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + */ + protected[spark] def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) } else { + // Right now, the number of paths is less than the value of + // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. + // If there is any child that has more files than the threshold, we will use parallel + // listing. + // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logTrace(s"Listing $path on driver") -Try { - HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) -}.getOrElse(Array.empty[FileStatus]) + +val childStatuses = { + // TODO: We need to avoid of using Try at here. + val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats +} + +childStatuses.map { + case f: LocatedFileStatus => f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => +if (f.isDirectory ) { + // If f is a
spark git commit: [SPARK-16121] ListingFileCatalog does not list in parallel anymore
Repository: spark Updated Branches: refs/heads/master d281b0baf -> 39ad53f7f [SPARK-16121] ListingFileCatalog does not list in parallel anymore ## What changes were proposed in this pull request? Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This PR fixes the problem ## How was this patch tested? Tested manually. (This PR also adds a proper test for SPARK-14959) Author: Yin HuaiCloses #13830 from yhuai/SPARK-16121. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39ad53f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39ad53f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39ad53f7 Branch: refs/heads/master Commit: 39ad53f7ffddae5ba0ff0a76089ba671b14c44c8 Parents: d281b0b Author: Yin Huai Authored: Wed Jun 22 18:07:07 2016 +0800 Committer: Cheng Lian Committed: Wed Jun 22 18:07:07 2016 +0800 -- .../datasources/ListingFileCatalog.scala| 58 ++-- .../datasources/fileSourceInterfaces.scala | 7 ++- .../datasources/FileSourceStrategySuite.scala | 45 ++- 3 files changed, 101 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39ad53f7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index f713fde..675e755 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import scala.util.Try -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.SparkSession @@ -73,21 +73,67 @@ class ListingFileCatalog( cachedPartitionSpec = null } - protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + */ + protected[spark] def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession) } else { + // Right now, the number of paths is less than the value of + // parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver. + // If there is any child that has more files than the threshold, we will use parallel + // listing. + // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logTrace(s"Listing $path on driver") -Try { - HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) -}.getOrElse(Array.empty[FileStatus]) + +val childStatuses = { + // TODO: We need to avoid of using Try at here. + val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) + if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats +} + +childStatuses.map { + case f: LocatedFileStatus => f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => +if (f.isDirectory ) { + // If f is a directory, we do not need to call getFileBlockLocations (SPARK-14959). + f +} else { +