[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r68095799 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -83,40 +83,10 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). --- End diff -- Yea. But, I think we should avoid of using `Try(...).getOrElse` whenever possible. It just swallows NonFatal exceptions and we will not explicitly handle exceptions. Yea, the code to list files at driver does work at executor side. But, changes at here are changing the behaviors of this method. Originally, if there is any dir that contains many files, we will launch a spark job. After the change, we only check children in the top level path and we only do the parallel list (if we ever do) for the first layer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r67968229 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala --- @@ -490,6 +491,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem { override def getFileBlockLocations( file: FileStatus, start: Long, len: Long): Array[BlockLocation] = { +require(!file.isDirectory, "The file path can not be a directory.") --- End diff -- It's better to add a test anyway. Otherwise, when those tests get changed, we may lost this test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r67964313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -83,40 +83,10 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). --- End diff -- the `Try` is also used previously: https://github.com/apache/spark/pull/13463/files#diff-87cabbe4d0c794f02523ecc1764955d0L88 BTW I think the code to list files at driver side should also work at executor side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r67956952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -83,40 +83,10 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). --- End diff -- I am not sure using `Try` at here is the right choice. Are we trying to use `Try().getOrElse` to catch the error thrown by `getFileBlockLocations`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user yhuai commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r67955807 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -83,40 +83,10 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). --- End diff -- `HadoopFsRelation.listLeafFiles` is the version that does the file listing work at the driver side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/13463 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user xwu0226 commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r65625548 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala --- @@ -490,6 +491,7 @@ class LocalityTestFileSystem extends RawLocalFileSystem { override def getFileBlockLocations( file: FileStatus, start: Long, len: Long): Array[BlockLocation] = { +require(!file.isDirectory, "The file path can not be a directory.") --- End diff -- This test suite already have test cases that create multiple partitions in a table. and the code path in this PR is visited by these test cases. Adding the require here will catch any case where a directory is passed to generate block locations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user xwu0226 commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r65618083 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -84,39 +84,12 @@ class ListingFileCatalog( val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +// HadoopFsRelation.listLeafFiles digs out all the leaf files and creates +// the list of [[LocatedFileStatus]] +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). + getOrElse(Array.empty) --- End diff -- @liancheng Thank! Yes. I will put back the Note here. As for the `Try`, I see [fileSourceInterfaces.listleafFilesInParallel](https://github.com/apache/spark/blob/dcac8e6f49918a809fb3f2b8bf666582c479a6eb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala#L425) is doing it. So for similarity, I do the same in case I introduce any surprise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user xwu0226 commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r65618152 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -84,39 +84,12 @@ class ListingFileCatalog( val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +// HadoopFsRelation.listLeafFiles digs out all the leaf files and creates +// the list of [[LocatedFileStatus]] --- End diff -- OK. I will remove this comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r65612485 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -84,39 +84,12 @@ class ListingFileCatalog( val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +// HadoopFsRelation.listLeafFiles digs out all the leaf files and creates +// the list of [[LocatedFileStatus]] +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). + getOrElse(Array.empty) --- End diff -- BTW, why a `Try` is necessary here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r65612284 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -84,39 +84,12 @@ class ListingFileCatalog( val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +// HadoopFsRelation.listLeafFiles digs out all the leaf files and creates +// the list of [[LocatedFileStatus]] --- End diff -- This comment might not be necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/13463#discussion_r65611847 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala --- @@ -84,39 +84,12 @@ class ListingFileCatalog( val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") -val statuses = { - val stats = Try(fs.listStatus(path)).getOrElse(Array.empty[FileStatus]) - if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats -} - -statuses.map { - case f: LocatedFileStatus => f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should a - // a big deal since we always use to `listLeafFilesInParallel` when the number of paths - // exceeds threshold. - case f => -HadoopFsRelation.createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen)) -} - }.filterNot { status => -val name = status.getPath.getName -HadoopFsRelation.shouldFilterOut(name) - } - - val (dirs, files) = statuses.partition(_.isDirectory) - - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { -mutable.LinkedHashSet(files: _*) - } else { -mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) +// HadoopFsRelation.listLeafFiles digs out all the leaf files and creates +// the list of [[LocatedFileStatus]] +Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). + getOrElse(Array.empty) --- End diff -- Thanks for simplifying this! Could you please move the original comments (the "NOTE" stuff) before [this line][1]? At first, I special cased S3* file systems in the original code, but removed them later due to the reasons mentioned in the comment, and then ended up with something essentially identical to `HadoopFsRelation.listLeafFiles`. That's why there's such weird a duplication. [1]: https://github.com/apache/spark/blob/8900c8d8ff1614b5ec5a2ce213832fa13462b4d4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala#L384 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13463: [SPARK-14959][SQL] handle partitioned table direc...
GitHub user xwu0226 opened a pull request: https://github.com/apache/spark/pull/13463 [SPARK-14959][SQL] handle partitioned table directories in distributed filesystem ## What changes were proposed in this pull request? # The root cause: When `DataSource.resolveRelation` is trying to build `ListingFileCatalog` object, `ListLeafFiles` is invoked where a list of `FileStatus` objects are retrieved from the provided path. These FileStatus objects include directories for the partitions (id=0 and id=2 in the jira). However, these directory `FileStatus` objects also try to invoke `getFileBlockLocations` where directory is not allowed for `DistributedFileSystem`, hence the exception happens. This PR is to remove the block of code that invokes `getFileBlockLocations` for every FileStatus object of the provided path. Instead, we call `HadoopFsRelation.listLeafFiles` directly because this utility method filters out the directories before calling `getFileBlockLocations` for generating `LocatedFileStatus` objects. ## How was this patch tested? Regtest is run. Manual test: ``` scala> spark.read.format("parquet").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_part").show +-+---+ | text| id| +-+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-+---+ spark.read.format("orc").load("hdfs://bdavm009.svl.ibm.com:8020/user/spark/SPARK-14959_orc").show +-+---+ | text| id| +-+---+ |hello| 0| |world| 0| |hello| 1| |there| 1| +-+---+ ``` I also tried it with 2 level of partitioning. I have not found a way to add test case that can test a real hdfs file location. Any suggestions will be appreciated. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xwu0226/spark SPARK-14959 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13463.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13463 commit 2f741878b8f357c61d75780fe911f922b4fd7847 Author: Xin WuDate: 2016-06-02T04:30:33Z SPARK-14959: handle partitioned table directories in distributed file system --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org