spark git commit: [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem
Repository: spark Updated Branches: refs/heads/branch-2.0 d5bd64a20 -> 1e13d09c5 [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem ## What changes were proposed in this pull request? # The root cause: When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are retrieved from the provided path. These FileStatus objects include directories for the partitions (id=0 and id=2 in the jira). However, these directory `FileStatus` objects also try to invoke `getFileBlockLocations` where directory is not allowed for `DistributedFileSystem`, hence the exception happens. This PR is to remove the block of code that invokes `getFileBlockLocations` for every FileStatus object of the provided path. Instead, we call `HadoopFsRelation.listLeafFiles` directly because this utility method filters out the directories before calling `getFileBlockLocations` for generating `LocatedFileStatus` objects. ## How was this patch tested? Regtest is run. Manual test: ``` scala> spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show +-+---+ | text| id| +-+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-+---+ spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show +-+---+ | text| id| +-+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-+---+ ``` I also tried it with 2 level of partitioning. I have not found a way to add test case in the unit test bucket that can test a real hdfs file location. Any suggestions will be appreciated. Author: Xin Wu Closes #13463 from xwu0226/SPARK-14959. (cherry picked from commit 76aa45d359d034e9ccaac64b36738d47e1e42f2c) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e13d09c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e13d09c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e13d09c Branch: refs/heads/branch-2.0 Commit: 1e13d09c526ec37c344950b07d938751bbd6fd0a Parents: d5bd64a Author: Xin Wu Authored: Thu Jun 2 22:49:17 2016 -0700 Committer: Cheng Lian Committed: Thu Jun 2 22:49:24 2016 -0700 -- .../datasources/ListingFileCatalog.scala| 36 ++-- .../datasources/fileSourceInterfaces.scala | 10 ++ .../datasources/FileSourceStrategySuite.scala | 1 + 3 files changed, 14 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1e13d09c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 644e5d6..dd3c96a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -83,40 +83,10 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(
spark git commit: [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem
Repository: spark Updated Branches: refs/heads/master 6dde27404 -> 76aa45d35 [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem ## What changes were proposed in this pull request? # The root cause: When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are retrieved from the provided path. These FileStatus objects include directories for the partitions (id=0 and id=2 in the jira). However, these directory `FileStatus` objects also try to invoke `getFileBlockLocations` where directory is not allowed for `DistributedFileSystem`, hence the exception happens. This PR is to remove the block of code that invokes `getFileBlockLocations` for every FileStatus object of the provided path. Instead, we call `HadoopFsRelation.listLeafFiles` directly because this utility method filters out the directories before calling `getFileBlockLocations` for generating `LocatedFileStatus` objects. ## How was this patch tested? Regtest is run. Manual test: ``` scala> spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show +-+---+ | text| id| +-+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-+---+ spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show +-+---+ | text| id| +-+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-+---+ ``` I also tried it with 2 level of partitioning. I have not found a way to add test case in the unit test bucket that can test a real hdfs file location. Any suggestions will be appreciated. Author: Xin Wu Closes #13463 from xwu0226/SPARK-14959. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76aa45d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76aa45d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76aa45d3 Branch: refs/heads/master Commit: 76aa45d359d034e9ccaac64b36738d47e1e42f2c Parents: 6dde274 Author: Xin Wu Authored: Thu Jun 2 22:49:17 2016 -0700 Committer: Cheng Lian Committed: Thu Jun 2 22:49:17 2016 -0700 -- .../datasources/ListingFileCatalog.scala| 36 ++-- .../datasources/fileSourceInterfaces.scala | 10 ++ .../datasources/FileSourceStrategySuite.scala | 1 + 3 files changed, 14 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76aa45d3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 644e5d6..dd3c96a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -83,40 +83,10 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). +