[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r68095799
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -83,40 +83,10 @@ class ListingFileCatalog(
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
-
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
--- End diff --

Yea. But, I think we should avoid of using `Try(...).getOrElse` whenever 
possible. It just swallows NonFatal exceptions and we will not explicitly 
handle exceptions. 

Yea, the code to list files at driver does work at executor side. But, 
changes at here are changing the behaviors of this method. Originally, if there 
is any dir that contains many files, we will launch a spark job. After the 
change, we only check children in the top level path and we only do the 
parallel list (if we ever do) for the first layer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r67968229
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 ---
@@ -490,6 +491,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem 
{
 
   override def getFileBlockLocations(
   file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+require(!file.isDirectory, "The file path can not be a directory.")
--- End diff --

It's better to add a test anyway. Otherwise, when those tests get changed, 
we may lost this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r67964313
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -83,40 +83,10 @@ class ListingFileCatalog(
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
-
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
--- End diff --

the `Try` is also used previously: 
https://github.com/apache/spark/pull/13463/files#diff-87cabbe4d0c794f02523ecc1764955d0L88

BTW I think the code to list files at driver side should also work at 
executor side.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r67956952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -83,40 +83,10 @@ class ListingFileCatalog(
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
-
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
--- End diff --

I am not sure using `Try` at here is the right choice. Are we trying to use 
`Try().getOrElse` to catch the error thrown by `getFileBlockLocations`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r67955807
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -83,40 +83,10 @@ class ListingFileCatalog(
   val statuses: Seq[FileStatus] = paths.flatMap { path =>
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
-
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
--- End diff --

`HadoopFsRelation.listLeafFiles` is the version that does the file listing 
work at the driver side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13463


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread xwu0226
Github user xwu0226 commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r65625548
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 ---
@@ -490,6 +491,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem 
{
 
   override def getFileBlockLocations(
   file: FileStatus, start: Long, len: Long): Array[BlockLocation] = {
+require(!file.isDirectory, "The file path can not be a directory.")
--- End diff --

This test suite already have test cases that create multiple partitions in 
a table. and the code path in this PR is visited by these test cases. Adding 
the require here will catch any case where a directory is passed to generate 
block locations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread xwu0226
Github user xwu0226 commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r65618083
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -84,39 +84,12 @@ class ListingFileCatalog(
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
 
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+// HadoopFsRelation.listLeafFiles digs out all the leaf files and 
creates
+// the list of [[LocatedFileStatus]]
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
+  getOrElse(Array.empty)
--- End diff --

@liancheng Thank! Yes. I will put back the Note here. As for the `Try`, I 
see 
[fileSourceInterfaces.listleafFilesInParallel](https://github.com/apache/spark/blob/dcac8e6f49918a809fb3f2b8bf666582c479a6eb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala#L425)
 is doing it. So for similarity, I do the same in case I introduce any surprise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread xwu0226
Github user xwu0226 commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r65618152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -84,39 +84,12 @@ class ListingFileCatalog(
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
 
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+// HadoopFsRelation.listLeafFiles digs out all the leaf files and 
creates
+// the list of [[LocatedFileStatus]]
--- End diff --

OK. I will remove this comment. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r65612485
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -84,39 +84,12 @@ class ListingFileCatalog(
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
 
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+// HadoopFsRelation.listLeafFiles digs out all the leaf files and 
creates
+// the list of [[LocatedFileStatus]]
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
+  getOrElse(Array.empty)
--- End diff --

BTW, why a `Try` is necessary here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r65612284
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -84,39 +84,12 @@ class ListingFileCatalog(
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
 
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+// HadoopFsRelation.listLeafFiles digs out all the leaf files and 
creates
+// the list of [[LocatedFileStatus]]
--- End diff --

This comment might not be necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-02 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/13463#discussion_r65611847
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 ---
@@ -84,39 +84,12 @@ class ListingFileCatalog(
 val fs = path.getFileSystem(hadoopConf)
 logInfo(s"Listing $path on driver")
 
-val statuses = {
-  val stats = 
Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus])
-  if (pathFilter != null) stats.filter(f => 
pathFilter.accept(f.getPath)) else stats
-}
-
-statuses.map {
-  case f: LocatedFileStatus => f
-
-  // NOTE:
-  //
-  // - Although S3/S3A/S3N file system can be quite slow for 
remote file metadata
-  //   operations, calling `getFileBlockLocations` does no harm 
here since these file system
-  //   implementations don't actually issue RPC for this method.
-  //
-  // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should a
-  //   a big deal since we always use to `listLeafFilesInParallel` 
when the number of paths
-  //   exceeds threshold.
-  case f =>
-HadoopFsRelation.createLocatedFileStatus(f, 
fs.getFileBlockLocations(f, 0, f.getLen))
-}
-  }.filterNot { status =>
-val name = status.getPath.getName
-HadoopFsRelation.shouldFilterOut(name)
-  }
-
-  val (dirs, files) = statuses.partition(_.isDirectory)
-
-  // It uses [[LinkedHashSet]] since the order of files can affect the 
results. (SPARK-11500)
-  if (dirs.isEmpty) {
-mutable.LinkedHashSet(files: _*)
-  } else {
-mutable.LinkedHashSet(files: _*) ++ 
listLeafFiles(dirs.map(_.getPath))
+// HadoopFsRelation.listLeafFiles digs out all the leaf files and 
creates
+// the list of [[LocatedFileStatus]]
+Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), 
pathFilter)).
+  getOrElse(Array.empty)
--- End diff --

Thanks for simplifying this! Could you please move the original comments 
(the "NOTE" stuff) before [this line][1]?

At first, I special cased S3* file systems in the original code, but 
removed them later due to the reasons mentioned in the comment, and then ended 
up with something essentially identical to `HadoopFsRelation.listLeafFiles`. 
That's why there's such weird a duplication.

[1]: 
https://github.com/apache/spark/blob/8900c8d8ff1614b5ec5a2ce213832fa13462b4d4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala#L384


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...

2016-06-01 Thread xwu0226
GitHub user xwu0226 opened a pull request:

https://github.com/apache/spark/pull/13463

[SPARK-14959][SQL] handle partitioned table directories in distributed 
filesystem

## What changes were proposed in this pull request?
# The root cause:
When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` 
object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are 
retrieved from the provided path. These FileStatus objects include directories 
for the partitions (id=0 and id=2 in the jira). However, these directory 
`FileStatus` objects also try to invoke `getFileBlockLocations` where directory 
is not allowed for `DistributedFileSystem`, hence the exception happens.

This PR is to remove the block of code that invokes `getFileBlockLocations` 
for every FileStatus object of the provided path. Instead, we call 
`HadoopFsRelation.listLeafFiles` directly because this utility method filters 
out the directories before calling `getFileBlockLocations` for generating 
`LocatedFileStatus` objects.

## How was this patch tested?
Regtest is run. Manual test:
```
scala> 
spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show
+-+---+
| text| id|
+-+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-+---+

   
spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show
+-+---+
| text| id|
+-+---+
|hello|  0|
|world|  0|
|hello|  1|
|there|  1|
+-+---+
```
I also tried it with 2 level of partitioning.
I have not found a way to add test case that can test a real hdfs file 
location. Any suggestions will be appreciated. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xwu0226/spark SPARK-14959

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13463


commit 2f741878b8f357c61d75780fe911f922b4fd7847
Author: Xin Wu 
Date:   2016-06-02T04:30:33Z

SPARK-14959: handle partitioned table directories in distributed file system




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org