spark git commit: [SPARK-16121] ListingFileCatalog does not list in parallel anymore

2016-06-22 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 838143a2a -> 60bd704b5


[SPARK-16121] ListingFileCatalog does not list in parallel anymore

## What changes were proposed in this pull request?
Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This 
PR fixes the problem

## How was this patch tested?
Tested manually. (This PR also adds a proper test for SPARK-14959)

Author: Yin Huai 

Closes #13830 from yhuai/SPARK-16121.

(cherry picked from commit 39ad53f7ffddae5ba0ff0a76089ba671b14c44c8)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60bd704b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60bd704b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60bd704b

Branch: refs/heads/branch-2.0
Commit: 60bd704b541c4d1991922ffd3dd5b47de9bd5821
Parents: 838143a
Author: Yin Huai 
Authored: Wed Jun 22 18:07:07 2016 +0800
Committer: Cheng Lian 
Committed: Wed Jun 22 18:07:27 2016 +0800

--
 .../datasources/ListingFileCatalog.scala| 58 ++--
 .../datasources/fileSourceInterfaces.scala  |  7 ++-
 .../datasources/FileSourceStrategySuite.scala   | 45 ++-
 3 files changed, 101 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60bd704b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index f713fde..675e755 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import scala.collection.mutable
 import scala.util.Try
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.SparkSession
@@ -73,21 +73,67 @@ class ListingFileCatalog(
 cachedPartitionSpec = null
   }
 
-  protected def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
+  /**
+   * List leaf files of given paths. This method will submit a Spark job to do 
parallel
+   * listing whenever there is a path having more files than the parallel 
partition discovery
+   * discovery threshold.
+   */
+  protected[spark] def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
 if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
   HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
 } else {
+  // Right now, the number of paths is less than the value of
+  // parallelPartitionDiscoveryThreshold. So, we will list file statues at 
the driver.
+  // If there is any child that has more files than the threshold, we will 
use parallel
+  // listing.
+
   // Dummy jobconf to get to the pathFilter defined in configuration
   val jobConf = new JobConf(hadoopConf, this.getClass)
   val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logTrace(s"Listing $path on driver")
-Try {
-  HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)
-}.getOrElse(Array.empty[FileStatus])
+
+val childStatuses = {
+  // TODO: We need to avoid of using Try at here.
+  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
+  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
+}
+
+childStatuses.map {
+  case f: LocatedFileStatus => f
+
+  // NOTE:
+  //
+  // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
+  //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
+  //   implementations don't actually issue RPC for this method.
+  //
+  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
+  //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
+  //   paths exceeds threshold.
+  case f =>
+if (f.isDirectory ) {
+  // If f is a 

spark git commit: [SPARK-16121] ListingFileCatalog does not list in parallel anymore

2016-06-22 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master d281b0baf -> 39ad53f7f


[SPARK-16121] ListingFileCatalog does not list in parallel anymore

## What changes were proposed in this pull request?
Seems the fix of SPARK-14959 breaks the parallel partitioning discovery. This 
PR fixes the problem

## How was this patch tested?
Tested manually. (This PR also adds a proper test for SPARK-14959)

Author: Yin Huai 

Closes #13830 from yhuai/SPARK-16121.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39ad53f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39ad53f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39ad53f7

Branch: refs/heads/master
Commit: 39ad53f7ffddae5ba0ff0a76089ba671b14c44c8
Parents: d281b0b
Author: Yin Huai 
Authored: Wed Jun 22 18:07:07 2016 +0800
Committer: Cheng Lian 
Committed: Wed Jun 22 18:07:07 2016 +0800

--
 .../datasources/ListingFileCatalog.scala| 58 ++--
 .../datasources/fileSourceInterfaces.scala  |  7 ++-
 .../datasources/FileSourceStrategySuite.scala   | 45 ++-
 3 files changed, 101 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39ad53f7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index f713fde..675e755 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import scala.collection.mutable
 import scala.util.Try
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.SparkSession
@@ -73,21 +73,67 @@ class ListingFileCatalog(
 cachedPartitionSpec = null
   }
 
-  protected def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
+  /**
+   * List leaf files of given paths. This method will submit a Spark job to do 
parallel
+   * listing whenever there is a path having more files than the parallel 
partition discovery
+   * discovery threshold.
+   */
+  protected[spark] def listLeafFiles(paths: Seq[Path]): 
mutable.LinkedHashSet[FileStatus] = {
 if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
   HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
 } else {
+  // Right now, the number of paths is less than the value of
+  // parallelPartitionDiscoveryThreshold. So, we will list file statues at 
the driver.
+  // If there is any child that has more files than the threshold, we will 
use parallel
+  // listing.
+
   // Dummy jobconf to get to the pathFilter defined in configuration
   val jobConf = new JobConf(hadoopConf, this.getClass)
   val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logTrace(s"Listing $path on driver")
-Try {
-  HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)
-}.getOrElse(Array.empty[FileStatus])
+
+val childStatuses = {
+  // TODO: We need to avoid of using Try at here.
+  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
+  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
+}
+
+childStatuses.map {
+  case f: LocatedFileStatus => f
+
+  // NOTE:
+  //
+  // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
+  //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
+  //   implementations don't actually issue RPC for this method.
+  //
+  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
+  //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
+  //   paths exceeds threshold.
+  case f =>
+if (f.isDirectory ) {
+  // If f is a directory, we do not need to call 
getFileBlockLocations (SPARK-14959).
+  f
+} else {
+