This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8ccfbc1 [SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core 8ccfbc1 is described below commit 8ccfbc114e3e8d9fc919bf05602e02a506566e31 Author: Chao Sun <sunc...@apache.org> AuthorDate: Thu Sep 24 10:58:52 2020 -0700 [SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This moves and refactors the parallel listing utilities from `InMemoryFileIndex` to Spark core so it can be reused by modules beside SQL. Along the process this also did some cleanups/refactorings: - Created a `HadoopFSUtils` class under core - Moved `InMemoryFileIndex.bulkListLeafFiles` into `HadoopFSUtils.parallelListLeafFiles`. It now depends on a `SparkContext` instead of `SparkSession` in SQL. Also added a few parameters which used to be read from `SparkSession.conf`: `ignoreMissingFiles`, `ignoreLocality`, `parallelismThreshold`, `parallelismMax ` and `filterFun` (for additional filtering support but we may be able to merge this with `filter` parameter in future). - Moved `InMemoryFileIndex.listLeafFiles` into `HadoopFSUtils.listLeafFiles` with similar changes above. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> Currently the locality-aware parallel listing mechanism only applies to `InMemoryFileIndex`. By moving this to core, we can potentially reuse the same mechanism for other code paths as well. ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as the documentation fix. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master. If no, write 'No'. --> No. ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> Since this is mostly a refactoring, it relies on existing unit tests such as those for `InMemoryFileIndex`. Closes #29471 from sunchao/SPARK-32381. Lead-authored-by: Chao Sun <sunc...@apache.org> Co-authored-by: Holden Karau <hka...@apple.com> Co-authored-by: Chao Sun <sunc...@uber.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../org/apache/spark/util/HadoopFSUtils.scala | 352 ++++++++------------- .../spark/sql/execution/command/CommandUtils.scala | 2 +- .../execution/datasources/InMemoryFileIndex.scala | 297 +---------------- 3 files changed, 150 insertions(+), 501 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala similarity index 51% copy from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala copy to core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index a488ed1..c0a135e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources +package org.apache.spark.util import java.io.FileNotFoundException @@ -25,175 +25,68 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.SparkContext +import org.apache.spark._ +import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.streaming.FileStreamSink -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration - /** - * A [[FileIndex]] that generates the list of files to process by recursively listing all the - * files present in `paths`. - * - * @param rootPathsSpecified the list of root table paths to scan (some of which might be - * filtered out later) - * @param parameters as set of options to control discovery - * @param userSpecifiedSchema an optional user specified schema that will be use to provide - * types for the discovered partitions + * Utility functions to simplify and speed-up file listing. */ -class InMemoryFileIndex( - sparkSession: SparkSession, - rootPathsSpecified: Seq[Path], - parameters: Map[String, String], - userSpecifiedSchema: Option[StructType], - fileStatusCache: FileStatusCache = NoopCache, - userSpecifiedPartitionSpec: Option[PartitionSpec] = None, - override val metadataOpsTimeNs: Option[Long] = None) - extends PartitioningAwareFileIndex( - sparkSession, parameters, userSpecifiedSchema, fileStatusCache) { - - // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) - // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain - // such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath" - // is the output of a streaming query. - override val rootPaths = - rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf)) - - @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ - @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ - @volatile private var cachedPartitionSpec: PartitionSpec = _ - - refresh0() - - override def partitionSpec(): PartitionSpec = { - if (cachedPartitionSpec == null) { - if (userSpecifiedPartitionSpec.isDefined) { - cachedPartitionSpec = userSpecifiedPartitionSpec.get - } else { - cachedPartitionSpec = inferPartitioning() - } - } - logTrace(s"Partition spec: $cachedPartitionSpec") - cachedPartitionSpec - } - - override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { - cachedLeafFiles - } - - override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { - cachedLeafDirToChildrenFiles - } - - override def refresh(): Unit = { - fileStatusCache.invalidateAll() - refresh0() - } - - private def refresh0(): Unit = { - val files = listLeafFiles(rootPaths) - cachedLeafFiles = - new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) - cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) - cachedPartitionSpec = null - } - - override def equals(other: Any): Boolean = other match { - case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet - case _ => false - } - - override def hashCode(): Int = rootPaths.toSet.hashCode() - - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery - * discovery threshold. - * - * This is publicly visible for testing. - */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val startTime = System.nanoTime() - val output = mutable.LinkedHashSet[FileStatus]() - val pathsToFetch = mutable.ArrayBuffer[Path]() - for (path <- paths) { - fileStatusCache.getLeafFiles(path) match { - case Some(files) => - HiveCatalogMetrics.incrementFileCacheHits(files.length) - output ++= files - case None => - pathsToFetch += path - } - () // for some reasons scalac 2.12 needs this; return type doesn't matter - } - val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) - val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = true) - discovered.foreach { case (path, leafFiles) => - HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) - fileStatusCache.putLeafFiles(path, leafFiles.toArray) - output ++= leafFiles - } - logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to list leaf files" + - s" for ${paths.length} paths.") - output - } -} - -object InMemoryFileIndex extends Logging { - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - +private[spark] object HadoopFSUtils extends Logging { /** * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list. * * This may only be called on the driver. * + * @param sc Spark context used to run parallel listing. + * @param paths Input paths to list + * @param hadoopConf Hadoop configuration + * @param filter Path filter used to exclude leaf files from result + * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root + * paths as opposed to nested paths encountered during recursive calls of this. + * @param ignoreMissingFiles Ignore missing files that happen during recursive listing + * (e.g., due to race conditions) + * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, + * this will return `FileStatus` without `BlockLocation` info. + * @param parallelismThreshold The threshold to enable parallelism. If the number of input paths + * is smaller than this value, this will fallback to use + * sequential listing. + * @param parallelismMax The maximum parallelism for listing. If the number of input paths is + * larger than this value, parallelism will be throttled to this value + * to avoid generating too many tasks. + * @param filterFun Optional predicate on the leaf files. Files who failed the check will be + * excluded from the results * @return for each input path, the set of discovered files for the path */ - private[sql] def bulkListLeafFiles( + def parallelListLeafFiles( + sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession, - areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { - - val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles - val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality + isRootLevel: Boolean, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int, + filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. - if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + if (paths.size <= parallelismThreshold) { return paths.map { path => val leafFiles = listLeafFiles( path, hadoopConf, filter, - Some(sparkSession), + Some(sc), ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = areRootPaths) + isRootPath = isRootLevel, + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismMax, + filterFun = filterFun) (path, leafFiles) } } @@ -202,73 +95,73 @@ object InMemoryFileIndex extends Logging { s" The first several paths are: ${paths.take(10).mkString(", ")}.") HiveCatalogMetrics.incrementParallelListingJobCount(1) - val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) - val parallelPartitionDiscoveryParallelism = - sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) + val numParallelism = Math.min(paths.size, parallelismMax) - val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) + val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) val statusMap = try { val description = paths.size match { case 0 => - s"Listing leaf files and directories 0 paths" + "Listing leaf files and directories 0 paths" case 1 => s"Listing leaf files and directories for 1 path:<br/>${paths(0)}" case s => s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..." } - sparkContext.setJobDescription(description) - sparkContext + sc.setJobDescription(description) + sc .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => val leafFiles = listLeafFiles( - path, - hadoopConf, - filter, - None, + path = path, + hadoopConf = hadoopConf, + filter = filter, + contextOpt = None, // Can't execute parallel scans on workers ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = areRootPaths) + isRootPath = isRootLevel, + filterFun = filterFun, + parallelismThreshold = Int.MaxValue, + parallelismMax = 0) (path, leafFiles) }.iterator }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] } - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) - }.collect() + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() } finally { - sparkContext.setJobDescription(previousJobDescription) + sc.setJobDescription(previousJobDescription) } // turn SerializableFileStatus back to Status @@ -287,9 +180,10 @@ object InMemoryFileIndex extends Logging { } } + // scalastyle:off argcount /** - * Lists a single filesystem path recursively. If a SparkSession object is specified, this - * function may launch Spark jobs to parallelize listing. + * Lists a single filesystem path recursively. If a `SparkContext` object is specified, this + * function may launch Spark jobs to parallelize listing based on `parallelismThreshold`. * * If sessionOpt is None, this may be called on executors. * @@ -299,10 +193,14 @@ object InMemoryFileIndex extends Logging { path: Path, hadoopConf: Configuration, filter: PathFilter, - sessionOpt: Option[SparkSession], + contextOpt: Option[SparkContext], ignoreMissingFiles: Boolean, ignoreLocality: Boolean, - isRootPath: Boolean): Seq[FileStatus] = { + isRootPath: Boolean, + filterFun: Option[String => Boolean], + parallelismThreshold: Int, + parallelismMax: Int): Seq[FileStatus] = { + logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -323,7 +221,7 @@ object InMemoryFileIndex extends Logging { case _ => fs.listStatus(path) } } catch { - // If we are listing a root path (e.g. a top level directory of a table), we need to + // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to // ignore FileNotFoundExceptions during this root level of the listing because // // (a) certain code paths might construct an InMemoryFileIndex with root paths that @@ -347,29 +245,43 @@ object InMemoryFileIndex extends Logging { Array.empty[FileStatus] } - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + def doFilter(statuses: Array[FileStatus]) = filterFun match { + case Some(shouldFilterOut) => + statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + case None => + statuses + } + val filteredStatuses = doFilter(statuses) val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { - case Some(session) => - bulkListLeafFiles( + val nestedFiles: Seq[FileStatus] = contextOpt match { + case Some(context) if dirs.size > parallelismThreshold => + parallelListLeafFiles( + context, dirs.map(_.getPath), - hadoopConf, - filter, - session, - areRootPaths = false + hadoopConf = hadoopConf, + filter = filter, + isRootLevel = false, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + filterFun = filterFun, + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismMax ).flatMap(_._2) case _ => dirs.flatMap { dir => listLeafFiles( - dir.getPath, - hadoopConf, - filter, - sessionOpt, + path = dir.getPath, + hadoopConf = hadoopConf, + filter = filter, + contextOpt = contextOpt, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = false) + isRootPath = false, + filterFun = filterFun, + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismMax) } } val allFiles = topLevelFiles ++ nestedFiles @@ -377,8 +289,7 @@ object InMemoryFileIndex extends Logging { } val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = allLeafStatuses.filterNot( - status => shouldFilterOut(status.getPath.getName)) + val filteredLeafStatuses = doFilter(allLeafStatuses) val resolvedLeafStatuses = filteredLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) @@ -390,7 +301,7 @@ object InMemoryFileIndex extends Logging { // implementations don't actually issue RPC for this method. // // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `bulkListLeafFiles` when the number of + // be a big deal since we always use to `parallelListLeafFiles` when the number of // paths exceeds threshold. case f if !ignoreLocality => // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), @@ -427,18 +338,23 @@ object InMemoryFileIndex extends Logging { resolvedLeafStatuses } + // scalastyle:on argcount - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter follow paths: - // 1. everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we - // should skip this file in case of double reading. - val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || - pathName.startsWith(".") || pathName.endsWith("._COPYING_") - val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") - exclude && !include - } + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index c047be7..8bf7504 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -163,7 +163,7 @@ object CommandUtils extends Logging { .getConfString("hive.exec.stagingdir", ".hive-staging") val filter = new PathFilterIgnoreNonData(stagingDir) val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten, - sparkSession.sessionState.newHadoopConf(), filter, sparkSession, areRootPaths = true).map { + sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map { case (_, files) => files.map(_.getLen).sum } // the size is 0 where paths(i) is not defined and sizes(i) where it is defined diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index a488ed1..130894e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -17,23 +17,18 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.hadoop.fs.viewfs.ViewFileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSink import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.HadoopFSUtils /** @@ -133,7 +128,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = true) + pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -147,286 +142,24 @@ class InMemoryFileIndex( object InMemoryFileIndex extends Logging { - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - - /** - * Lists a collection of paths recursively. Picks the listing strategy adaptively depending - * on the number of paths to list. - * - * This may only be called on the driver. - * - * @return for each input path, the set of discovered files for the path - */ private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, sparkSession: SparkSession, - areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { - - val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles - val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality - - // Short-circuits parallel listing when serial listing is likely to be faster. - if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - return paths.map { path => - val leafFiles = listLeafFiles( - path, - hadoopConf, - filter, - Some(sparkSession), - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - isRootPath = areRootPaths) - (path, leafFiles) - } - } - - logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + - s" The first several paths are: ${paths.take(10).mkString(", ")}.") - HiveCatalogMetrics.incrementParallelListingJobCount(1) - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - val parallelPartitionDiscoveryParallelism = - sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) - - val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) - val statusMap = try { - val description = paths.size match { - case 0 => - s"Listing leaf files and directories 0 paths" - case 1 => - s"Listing leaf files and directories for 1 path:<br/>${paths(0)}" - case s => - s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..." - } - sparkContext.setJobDescription(description) - sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { pathStrings => - val hadoopConf = serializableConfiguration.value - pathStrings.map(new Path(_)).toSeq.map { path => - val leafFiles = listLeafFiles( - path, - hadoopConf, - filter, - None, - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - isRootPath = areRootPaths) - (path, leafFiles) - }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) - }.collect() - } finally { - sparkContext.setJobDescription(previousJobDescription) - } - - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } - } - - /** - * Lists a single filesystem path recursively. If a SparkSession object is specified, this - * function may launch Spark jobs to parallelize listing. - * - * If sessionOpt is None, this may be called on executors. - * - * @return all children of path that match the specified filter. - */ - private def listLeafFiles( - path: Path, - hadoopConf: Configuration, - filter: PathFilter, - sessionOpt: Option[SparkSession], - ignoreMissingFiles: Boolean, - ignoreLocality: Boolean, - isRootPath: Boolean): Seq[FileStatus] = { - logTrace(s"Listing $path") - val fs = path.getFileSystem(hadoopConf) - - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses: Array[FileStatus] = try { - fs match { - // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode - // to retrieve the file status with the file block location. The reason to still fallback - // to listStatus is because the default implementation would potentially throw a - // FileNotFoundException which is better handled by doing the lookups manually below. - case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality => - val remoteIter = fs.listLocatedStatus(path) - new Iterator[LocatedFileStatus]() { - def next(): LocatedFileStatus = remoteIter.next - def hasNext(): Boolean = remoteIter.hasNext - }.toArray - case _ => fs.listStatus(path) - } - } catch { - // If we are listing a root path (e.g. a top level directory of a table), we need to - // ignore FileNotFoundExceptions during this root level of the listing because - // - // (a) certain code paths might construct an InMemoryFileIndex with root paths that - // might not exist (i.e. not all callers are guaranteed to have checked - // path existence prior to constructing InMemoryFileIndex) and, - // (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break - // existing behavior and break the ability drop SessionCatalog tables when tables' - // root directories have been deleted (which breaks a number of Spark's own tests). - // - // If we are NOT listing a root path then a FileNotFoundException here means that the - // directory was present in a previous level of file listing but is absent in this - // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 - // list inconsistency). - // - // The trade-off in supporting existing behaviors / use-cases is that we won't be - // able to detect race conditions involving root paths being deleted during - // InMemoryFileIndex construction. However, it's still a net improvement to detect and - // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. - case _: FileNotFoundException if isRootPath || ignoreMissingFiles => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - - val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { - case Some(session) => - bulkListLeafFiles( - dirs.map(_.getPath), - hadoopConf, - filter, - session, - areRootPaths = false - ).flatMap(_._2) - case _ => - dirs.flatMap { dir => - listLeafFiles( - dir.getPath, - hadoopConf, - filter, - sessionOpt, - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - isRootPath = false) - } - } - val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles - } - - val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = allLeafStatuses.filterNot( - status => shouldFilterOut(status.getPath.getName)) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { - case f: LocatedFileStatus => - Some(f) - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `bulkListLeafFiles` when the number of - // paths exceeds threshold. - case f if !ignoreLocality => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => - // Store BlockLocation objects to consume less memory - if (loc.getClass == classOf[BlockLocation]) { - loc - } else { - new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) - } - } - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - Some(lfs) - } catch { - case _: FileNotFoundException if ignoreMissingFiles => - missingFiles += f.getPath.toString - None - } - - case f => Some(f) - } - - if (missingFiles.nonEmpty) { - logWarning( - s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") - } - - resolvedLeafStatuses - } + isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = { + HadoopFSUtils.parallelListLeafFiles( + sc = sparkSession.sparkContext, + paths = paths, + hadoopConf = hadoopConf, + filter = filter, + isRootLevel = isRootLevel, + ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, + ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, + parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, + parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism, + filterFun = Some(shouldFilterOut)) + } /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org