spark git commit: [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem

2016-06-02 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d5bd64a20 -> 1e13d09c5


[SPARK-14959][SQL] handle partitioned table directories in distributed 
filesystem

## What changes were proposed in this pull request?
# The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` 
object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are 
retrieved from the provided path. These FileStatus objects include directories 
for the partitions (id=0 and id=2 in the jira). However, these directory 
`FileStatus` objects also try to invoke `getFileBlockLocations` where directory 
is not allowed for `DistributedFileSystem`, hence the exception happens.

This PR is to remove the block of code that invokes `getFileBlockLocations` for 
every FileStatus object of the provided path. Instead, we call 
`HadoopFsRelation.listLeafFiles` directly because this utility method filters 
out the directories before calling `getFileBlockLocations` for generating 
`LocatedFileStatus` objects.

## How was this patch tested?
Regtest is run. Manual test:
```
scala> 
spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-+---+
| text| id|
+-+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-+---+

   
spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-+---+
| text| id|
+-+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case in the unit test bucket that can test a 
real hdfs file location. Any suggestions will be appreciated.

Author: Xin Wu 

Closes #13463 from xwu0226/SPARK-14959.

(cherry picked from commit 76aa45d359d034e9ccaac64b36738d47e1e42f2c)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e13d09c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e13d09c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e13d09c

Branch: refs/heads/branch-2.0
Commit: 1e13d09c526ec37c344950b07d938751bbd6fd0a
Parents: d5bd64a
Author: Xin Wu 
Authored: Thu Jun 2 22:49:17 2016 -0700
Committer: Cheng Lian 
Committed: Thu Jun 2 22:49:24 2016 -0700

--
 .../datasources/ListingFileCatalog.scala| 36 ++--
 .../datasources/fileSourceInterfaces.scala  | 10 ++
 .../datasources/FileSourceStrategySuite.scala   |  1 +
 3 files changed, 14 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1e13d09c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 644e5d6..dd3c96a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -83,40 +83,10 @@ class ListingFileCatalog(
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
-
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
-  //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(

spark git commit: [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem

2016-06-02 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 6dde27404 -> 76aa45d35


[SPARK-14959][SQL] handle partitioned table directories in distributed 
filesystem

## What changes were proposed in this pull request?
# The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` 
object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are 
retrieved from the provided path. These FileStatus objects include directories 
for the partitions (id=0 and id=2 in the jira). However, these directory 
`FileStatus` objects also try to invoke `getFileBlockLocations` where directory 
is not allowed for `DistributedFileSystem`, hence the exception happens.

This PR is to remove the block of code that invokes `getFileBlockLocations` for 
every FileStatus object of the provided path. Instead, we call 
`HadoopFsRelation.listLeafFiles` directly because this utility method filters 
out the directories before calling `getFileBlockLocations` for generating 
`LocatedFileStatus` objects.

## How was this patch tested?
Regtest is run. Manual test:
```
scala> 
spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-+---+
| text| id|
+-+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-+---+

   
spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-+---+
| text| id|
+-+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case in the unit test bucket that can test a 
real hdfs file location. Any suggestions will be appreciated.

Author: Xin Wu 

Closes #13463 from xwu0226/SPARK-14959.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76aa45d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76aa45d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76aa45d3

Branch: refs/heads/master
Commit: 76aa45d359d034e9ccaac64b36738d47e1e42f2c
Parents: 6dde274
Author: Xin Wu 
Authored: Thu Jun 2 22:49:17 2016 -0700
Committer: Cheng Lian 
Committed: Thu Jun 2 22:49:17 2016 -0700

--
 .../datasources/ListingFileCatalog.scala| 36 ++--
 .../datasources/fileSourceInterfaces.scala  | 10 ++
 .../datasources/FileSourceStrategySuite.scala   |  1 +
 3 files changed, 14 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/76aa45d3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 644e5d6..dd3c96a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -83,40 +83,10 @@ class ListingFileCatalog(
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
-
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for remote 
file metadata
-  //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
+