This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ccfbc1  [SPARK-32381][CORE][SQL] Move and refactor parallel listing & 
non-location sensitive listing to core
8ccfbc1 is described below

commit 8ccfbc114e3e8d9fc919bf05602e02a506566e31
Author: Chao Sun <sunc...@apache.org>
AuthorDate: Thu Sep 24 10:58:52 2020 -0700

    [SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location 
sensitive listing to core
    
    <!--
    Thanks for sending a pull request!  Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
      2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
      4. Be sure to keep the PR description updated to reflect all changes.
      5. Please write your PR title to summarize what this PR proposes.
      6. If possible, provide a concise example to reproduce the issue for a 
faster review.
      7. If you want to add a new configuration, please read the guideline 
first for naming configurations in
         
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
    -->
    
    ### What changes were proposed in this pull request?
    <!--
    Please clarify what changes you are proposing. The purpose of this section 
is to outline the changes and how this PR fixes the issue.
    If possible, please consider writing useful notes for better and faster 
reviews in your PR. See the examples below.
      1. If you refactor some codes with changing classes, showing the class 
hierarchy will help reviewers.
      2. If you fix some SQL features, you can provide some references of other 
DBMSes.
      3. If there is design documentation, please add the link.
      4. If there is a discussion in the mailing list, please add the link.
    -->
    
    This moves and refactors the parallel listing utilities from 
`InMemoryFileIndex` to Spark core so it can be reused by modules beside SQL. 
Along the process this also did some cleanups/refactorings:
    
    - Created a `HadoopFSUtils` class under core
    - Moved `InMemoryFileIndex.bulkListLeafFiles` into 
`HadoopFSUtils.parallelListLeafFiles`. It now depends on a `SparkContext` 
instead of `SparkSession` in SQL. Also added a few parameters which used to be 
read from `SparkSession.conf`: `ignoreMissingFiles`, `ignoreLocality`, 
`parallelismThreshold`, `parallelismMax ` and `filterFun` (for additional 
filtering support but we may be able to merge this with `filter` parameter in 
future).
    - Moved `InMemoryFileIndex.listLeafFiles` into 
`HadoopFSUtils.listLeafFiles` with similar changes above.
    
    ### Why are the changes needed?
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Currently the locality-aware parallel listing mechanism only applies to 
`InMemoryFileIndex`. By moving this to core, we can potentially reuse the same 
mechanism for other code paths as well.
    
    ### Does this PR introduce _any_ user-facing change?
    <!--
    Note that it means *any* user-facing change including all aspects such as 
the documentation fix.
    If yes, please clarify the previous behavior and the change this PR 
proposes - provide the console output, description and/or an example to show 
the behavior difference if possible.
    If possible, please also clarify if this is a user-facing change compared 
to the released Spark versions or within the unreleased branches such as master.
    If no, write 'No'.
    -->
    
    No.
    
    ### How was this patch tested?
    <!--
    If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
    If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
    If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
    -->
    
    Since this is mostly a refactoring, it relies on existing unit tests such 
as those for `InMemoryFileIndex`.
    
    Closes #29471 from sunchao/SPARK-32381.
    
    Lead-authored-by: Chao Sun <sunc...@apache.org>
    Co-authored-by: Holden Karau <hka...@apple.com>
    Co-authored-by: Chao Sun <sunc...@uber.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../org/apache/spark/util/HadoopFSUtils.scala      | 352 ++++++++-------------
 .../spark/sql/execution/command/CommandUtils.scala |   2 +-
 .../execution/datasources/InMemoryFileIndex.scala  | 297 +----------------
 3 files changed, 150 insertions(+), 501 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
similarity index 51%
copy from 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
copy to core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index a488ed1..c0a135e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.util
 
 import java.io.FileNotFoundException
 
@@ -25,175 +25,68 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.viewfs.ViewFileSystem
 import org.apache.hadoop.hdfs.DistributedFileSystem
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.SparkContext
+import org.apache.spark._
+import org.apache.spark.annotation.Private
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.FileStreamSink
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
 
 /**
- * A [[FileIndex]] that generates the list of files to process by recursively 
listing all the
- * files present in `paths`.
- *
- * @param rootPathsSpecified the list of root table paths to scan (some of 
which might be
- *                           filtered out later)
- * @param parameters as set of options to control discovery
- * @param userSpecifiedSchema an optional user specified schema that will be 
use to provide
- *                            types for the discovered partitions
+ * Utility functions to simplify and speed-up file listing.
  */
-class InMemoryFileIndex(
-    sparkSession: SparkSession,
-    rootPathsSpecified: Seq[Path],
-    parameters: Map[String, String],
-    userSpecifiedSchema: Option[StructType],
-    fileStatusCache: FileStatusCache = NoopCache,
-    userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
-    override val metadataOpsTimeNs: Option[Long] = None)
-  extends PartitioningAwareFileIndex(
-    sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {
-
-  // Filter out streaming metadata dirs or files such as 
"/.../_spark_metadata" (the metadata dir)
-  // or "/.../_spark_metadata/0" (a file in the metadata dir). 
`rootPathsSpecified` might contain
-  // such streaming metadata dir or files, e.g. when after globbing 
"basePath/*" where "basePath"
-  // is the output of a streaming query.
-  override val rootPaths =
-    rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, 
hadoopConf))
-
-  @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, 
FileStatus] = _
-  @volatile private var cachedLeafDirToChildrenFiles: Map[Path, 
Array[FileStatus]] = _
-  @volatile private var cachedPartitionSpec: PartitionSpec = _
-
-  refresh0()
-
-  override def partitionSpec(): PartitionSpec = {
-    if (cachedPartitionSpec == null) {
-      if (userSpecifiedPartitionSpec.isDefined) {
-        cachedPartitionSpec = userSpecifiedPartitionSpec.get
-      } else {
-        cachedPartitionSpec = inferPartitioning()
-      }
-    }
-    logTrace(s"Partition spec: $cachedPartitionSpec")
-    cachedPartitionSpec
-  }
-
-  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
-    cachedLeafFiles
-  }
-
-  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] 
= {
-    cachedLeafDirToChildrenFiles
-  }
-
-  override def refresh(): Unit = {
-    fileStatusCache.invalidateAll()
-    refresh0()
-  }
-
-  private def refresh0(): Unit = {
-    val files = listLeafFiles(rootPaths)
-    cachedLeafFiles =
-      new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => 
f.getPath -> f)
-    cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
-    cachedPartitionSpec = null
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
-    case _ => false
-  }
-
-  override def hashCode(): Int = rootPaths.toSet.hashCode()
-
-  /**
-   * List leaf files of given paths. This method will submit a Spark job to do 
parallel
-   * listing whenever there is a path having more files than the parallel 
partition discovery
-   * discovery threshold.
-   *
-   * This is publicly visible for testing.
-   */
-  def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    val startTime = System.nanoTime()
-    val output = mutable.LinkedHashSet[FileStatus]()
-    val pathsToFetch = mutable.ArrayBuffer[Path]()
-    for (path <- paths) {
-      fileStatusCache.getLeafFiles(path) match {
-        case Some(files) =>
-          HiveCatalogMetrics.incrementFileCacheHits(files.length)
-          output ++= files
-        case None =>
-          pathsToFetch += path
-      }
-      () // for some reasons scalac 2.12 needs this; return type doesn't matter
-    }
-    val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, 
this.getClass))
-    val discovered = InMemoryFileIndex.bulkListLeafFiles(
-      pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = 
true)
-    discovered.foreach { case (path, leafFiles) =>
-      HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
-      fileStatusCache.putLeafFiles(path, leafFiles.toArray)
-      output ++= leafFiles
-    }
-    logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to 
list leaf files" +
-      s" for ${paths.length} paths.")
-    output
-  }
-}
-
-object InMemoryFileIndex extends Logging {
-
-  /** A serializable variant of HDFS's BlockLocation. */
-  private case class SerializableBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  /** A serializable variant of HDFS's FileStatus. */
-  private case class SerializableFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[SerializableBlockLocation])
-
+private[spark] object HadoopFSUtils extends Logging {
   /**
    * Lists a collection of paths recursively. Picks the listing strategy 
adaptively depending
    * on the number of paths to list.
    *
    * This may only be called on the driver.
    *
+   * @param sc Spark context used to run parallel listing.
+   * @param paths Input paths to list
+   * @param hadoopConf Hadoop configuration
+   * @param filter Path filter used to exclude leaf files from result
+   * @param isRootLevel Whether the input paths are at the root level, i.e., 
they are the root
+   *                    paths as opposed to nested paths encountered during 
recursive calls of this.
+   * @param ignoreMissingFiles Ignore missing files that happen during 
recursive listing
+   *                           (e.g., due to race conditions)
+   * @param ignoreLocality Whether to fetch data locality info when listing 
leaf files. If false,
+   *                       this will return `FileStatus` without 
`BlockLocation` info.
+   * @param parallelismThreshold The threshold to enable parallelism. If the 
number of input paths
+   *                             is smaller than this value, this will 
fallback to use
+   *                             sequential listing.
+   * @param parallelismMax The maximum parallelism for listing. If the number 
of input paths is
+   *                       larger than this value, parallelism will be 
throttled to this value
+   *                       to avoid generating too many tasks.
+   * @param filterFun Optional predicate on the leaf files. Files who failed 
the check will be
+   *                  excluded from the results
    * @return for each input path, the set of discovered files for the path
    */
-  private[sql] def bulkListLeafFiles(
+  def parallelListLeafFiles(
+      sc: SparkContext,
       paths: Seq[Path],
       hadoopConf: Configuration,
       filter: PathFilter,
-      sparkSession: SparkSession,
-      areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
-
-    val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
-    val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality
+      isRootLevel: Boolean,
+      ignoreMissingFiles: Boolean,
+      ignoreLocality: Boolean,
+      parallelismThreshold: Int,
+      parallelismMax: Int,
+      filterFun: Option[String => Boolean] = None): Seq[(Path, 
Seq[FileStatus])] = {
 
     // Short-circuits parallel listing when serial listing is likely to be 
faster.
-    if (paths.size <= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+    if (paths.size <= parallelismThreshold) {
       return paths.map { path =>
         val leafFiles = listLeafFiles(
           path,
           hadoopConf,
           filter,
-          Some(sparkSession),
+          Some(sc),
           ignoreMissingFiles = ignoreMissingFiles,
           ignoreLocality = ignoreLocality,
-          isRootPath = areRootPaths)
+          isRootPath = isRootLevel,
+          parallelismThreshold = parallelismThreshold,
+          parallelismMax = parallelismMax,
+          filterFun = filterFun)
         (path, leafFiles)
       }
     }
@@ -202,73 +95,73 @@ object InMemoryFileIndex extends Logging {
       s" The first several paths are: ${paths.take(10).mkString(", ")}.")
     HiveCatalogMetrics.incrementParallelListingJobCount(1)
 
-    val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
-    val parallelPartitionDiscoveryParallelism =
-      sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
 
     // Set the number of parallelism to prevent following file listing from 
generating many tasks
     // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 
parallelPartitionDiscoveryParallelism)
+    val numParallelism = Math.min(paths.size, parallelismMax)
 
-    val previousJobDescription = 
sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+    val previousJobDescription = 
sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
     val statusMap = try {
       val description = paths.size match {
         case 0 =>
-          s"Listing leaf files and directories 0 paths"
+          "Listing leaf files and directories 0 paths"
         case 1 =>
           s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
         case s =>
           s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, 
..."
       }
-      sparkContext.setJobDescription(description)
-      sparkContext
+      sc.setJobDescription(description)
+      sc
         .parallelize(serializedPaths, numParallelism)
         .mapPartitions { pathStrings =>
           val hadoopConf = serializableConfiguration.value
           pathStrings.map(new Path(_)).toSeq.map { path =>
             val leafFiles = listLeafFiles(
-              path,
-              hadoopConf,
-              filter,
-              None,
+              path = path,
+              hadoopConf = hadoopConf,
+              filter = filter,
+              contextOpt = None, // Can't execute parallel scans on workers
               ignoreMissingFiles = ignoreMissingFiles,
               ignoreLocality = ignoreLocality,
-              isRootPath = areRootPaths)
+              isRootPath = isRootLevel,
+              filterFun = filterFun,
+              parallelismThreshold = Int.MaxValue,
+              parallelismMax = 0)
             (path, leafFiles)
           }.iterator
         }.map { case (path, statuses) =>
-        val serializableStatuses = statuses.map { status =>
-          // Turn FileStatus into SerializableFileStatus so we can send it 
back to the driver
-          val blockLocations = status match {
-            case f: LocatedFileStatus =>
-              f.getBlockLocations.map { loc =>
-                SerializableBlockLocation(
-                  loc.getNames,
-                  loc.getHosts,
-                  loc.getOffset,
-                  loc.getLength)
+            val serializableStatuses = statuses.map { status =>
+              // Turn FileStatus into SerializableFileStatus so we can send it 
back to the driver
+              val blockLocations = status match {
+                case f: LocatedFileStatus =>
+                  f.getBlockLocations.map { loc =>
+                    SerializableBlockLocation(
+                      loc.getNames,
+                      loc.getHosts,
+                      loc.getOffset,
+                      loc.getLength)
+                  }
+
+                case _ =>
+                  Array.empty[SerializableBlockLocation]
               }
 
-            case _ =>
-              Array.empty[SerializableBlockLocation]
-          }
-
-          SerializableFileStatus(
-            status.getPath.toString,
-            status.getLen,
-            status.isDirectory,
-            status.getReplication,
-            status.getBlockSize,
-            status.getModificationTime,
-            status.getAccessTime,
-            blockLocations)
-        }
-        (path.toString, serializableStatuses)
-      }.collect()
+              SerializableFileStatus(
+                status.getPath.toString,
+                status.getLen,
+                status.isDirectory,
+                status.getReplication,
+                status.getBlockSize,
+                status.getModificationTime,
+                status.getAccessTime,
+                blockLocations)
+            }
+            (path.toString, serializableStatuses)
+        }.collect()
     } finally {
-      sparkContext.setJobDescription(previousJobDescription)
+      sc.setJobDescription(previousJobDescription)
     }
 
     // turn SerializableFileStatus back to Status
@@ -287,9 +180,10 @@ object InMemoryFileIndex extends Logging {
     }
   }
 
+  // scalastyle:off argcount
   /**
-   * Lists a single filesystem path recursively. If a SparkSession object is 
specified, this
-   * function may launch Spark jobs to parallelize listing.
+   * Lists a single filesystem path recursively. If a `SparkContext` object is 
specified, this
+   * function may launch Spark jobs to parallelize listing based on 
`parallelismThreshold`.
    *
    * If sessionOpt is None, this may be called on executors.
    *
@@ -299,10 +193,14 @@ object InMemoryFileIndex extends Logging {
       path: Path,
       hadoopConf: Configuration,
       filter: PathFilter,
-      sessionOpt: Option[SparkSession],
+      contextOpt: Option[SparkContext],
       ignoreMissingFiles: Boolean,
       ignoreLocality: Boolean,
-      isRootPath: Boolean): Seq[FileStatus] = {
+      isRootPath: Boolean,
+      filterFun: Option[String => Boolean],
+      parallelismThreshold: Int,
+      parallelismMax: Int): Seq[FileStatus] = {
+
     logTrace(s"Listing $path")
     val fs = path.getFileSystem(hadoopConf)
 
@@ -323,7 +221,7 @@ object InMemoryFileIndex extends Logging {
         case _ => fs.listStatus(path)
       }
     } catch {
-      // If we are listing a root path (e.g. a top level directory of a 
table), we need to
+      // If we are listing a root path for SQL (e.g. a top level directory of 
a table), we need to
       // ignore FileNotFoundExceptions during this root level of the listing 
because
       //
       //  (a) certain code paths might construct an InMemoryFileIndex with 
root paths that
@@ -347,29 +245,43 @@ object InMemoryFileIndex extends Logging {
         Array.empty[FileStatus]
     }
 
-    val filteredStatuses = statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName))
+    def doFilter(statuses: Array[FileStatus]) = filterFun match {
+      case Some(shouldFilterOut) =>
+        statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
+      case None =>
+        statuses
+    }
 
+    val filteredStatuses = doFilter(statuses)
     val allLeafStatuses = {
       val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
-      val nestedFiles: Seq[FileStatus] = sessionOpt match {
-        case Some(session) =>
-          bulkListLeafFiles(
+      val nestedFiles: Seq[FileStatus] = contextOpt match {
+        case Some(context) if dirs.size > parallelismThreshold =>
+          parallelListLeafFiles(
+            context,
             dirs.map(_.getPath),
-            hadoopConf,
-            filter,
-            session,
-            areRootPaths = false
+            hadoopConf = hadoopConf,
+            filter = filter,
+            isRootLevel = false,
+            ignoreMissingFiles = ignoreMissingFiles,
+            ignoreLocality = ignoreLocality,
+            filterFun = filterFun,
+            parallelismThreshold = parallelismThreshold,
+            parallelismMax = parallelismMax
           ).flatMap(_._2)
         case _ =>
           dirs.flatMap { dir =>
             listLeafFiles(
-              dir.getPath,
-              hadoopConf,
-              filter,
-              sessionOpt,
+              path = dir.getPath,
+              hadoopConf = hadoopConf,
+              filter = filter,
+              contextOpt = contextOpt,
               ignoreMissingFiles = ignoreMissingFiles,
               ignoreLocality = ignoreLocality,
-              isRootPath = false)
+              isRootPath = false,
+              filterFun = filterFun,
+              parallelismThreshold = parallelismThreshold,
+              parallelismMax = parallelismMax)
           }
       }
       val allFiles = topLevelFiles ++ nestedFiles
@@ -377,8 +289,7 @@ object InMemoryFileIndex extends Logging {
     }
 
     val missingFiles = mutable.ArrayBuffer.empty[String]
-    val filteredLeafStatuses = allLeafStatuses.filterNot(
-      status => shouldFilterOut(status.getPath.getName))
+    val filteredLeafStatuses = doFilter(allLeafStatuses)
     val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
       case f: LocatedFileStatus =>
         Some(f)
@@ -390,7 +301,7 @@ object InMemoryFileIndex extends Logging {
       //   implementations don't actually issue RPC for this method.
       //
       // - Here we are calling `getFileBlockLocations` in a sequential manner, 
but it should not
-      //   be a big deal since we always use to `bulkListLeafFiles` when the 
number of
+      //   be a big deal since we always use to `parallelListLeafFiles` when 
the number of
       //   paths exceeds threshold.
       case f if !ignoreLocality =>
         // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
@@ -427,18 +338,23 @@ object InMemoryFileIndex extends Logging {
 
     resolvedLeafStatuses
   }
+  // scalastyle:on argcount
 
-  /** Checks if we should filter out this path name. */
-  def shouldFilterOut(pathName: String): Boolean = {
-    // We filter follow paths:
-    // 1. everything that starts with _ and ., except _common_metadata and 
_metadata
-    // because Parquet needs to find those metadata files from leaf files 
returned by this method.
-    // We should refactor this logic to not mix metadata files with data files.
-    // 2. everything that ends with `._COPYING_`, because this is a 
intermediate state of file. we
-    // should skip this file in case of double reading.
-    val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
-      pathName.startsWith(".") || pathName.endsWith("._COPYING_")
-    val include = pathName.startsWith("_common_metadata") || 
pathName.startsWith("_metadata")
-    exclude && !include
-  }
+  /** A serializable variant of HDFS's BlockLocation. */
+  private case class SerializableBlockLocation(
+    names: Array[String],
+    hosts: Array[String],
+    offset: Long,
+    length: Long)
+
+  /** A serializable variant of HDFS's FileStatus. */
+  private case class SerializableFileStatus(
+    path: String,
+    length: Long,
+    isDir: Boolean,
+    blockReplication: Short,
+    blockSize: Long,
+    modificationTime: Long,
+    accessTime: Long,
+    blockLocations: Array[SerializableBlockLocation])
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index c047be7..8bf7504 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -163,7 +163,7 @@ object CommandUtils extends Logging {
       .getConfString("hive.exec.stagingdir", ".hive-staging")
     val filter = new PathFilterIgnoreNonData(stagingDir)
     val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
-      sparkSession.sessionState.newHadoopConf(), filter, sparkSession, 
areRootPaths = true).map {
+      sparkSession.sessionState.newHadoopConf(), filter, sparkSession, 
isRootLevel = true).map {
       case (_, files) => files.map(_.getLen).sum
     }
     // the size is 0 where paths(i) is not defined and sizes(i) where it is 
defined
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index a488ed1..130894e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -17,23 +17,18 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.FileNotFoundException
-
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
-import org.apache.hadoop.fs.viewfs.ViewFileSystem
-import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
-import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.FileStreamSink
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.HadoopFSUtils
 
 
 /**
@@ -133,7 +128,7 @@ class InMemoryFileIndex(
     }
     val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, 
this.getClass))
     val discovered = InMemoryFileIndex.bulkListLeafFiles(
-      pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = 
true)
+      pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -147,286 +142,24 @@ class InMemoryFileIndex(
 
 object InMemoryFileIndex extends Logging {
 
-  /** A serializable variant of HDFS's BlockLocation. */
-  private case class SerializableBlockLocation(
-      names: Array[String],
-      hosts: Array[String],
-      offset: Long,
-      length: Long)
-
-  /** A serializable variant of HDFS's FileStatus. */
-  private case class SerializableFileStatus(
-      path: String,
-      length: Long,
-      isDir: Boolean,
-      blockReplication: Short,
-      blockSize: Long,
-      modificationTime: Long,
-      accessTime: Long,
-      blockLocations: Array[SerializableBlockLocation])
-
-  /**
-   * Lists a collection of paths recursively. Picks the listing strategy 
adaptively depending
-   * on the number of paths to list.
-   *
-   * This may only be called on the driver.
-   *
-   * @return for each input path, the set of discovered files for the path
-   */
   private[sql] def bulkListLeafFiles(
       paths: Seq[Path],
       hadoopConf: Configuration,
       filter: PathFilter,
       sparkSession: SparkSession,
-      areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
-
-    val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
-    val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality
-
-    // Short-circuits parallel listing when serial listing is likely to be 
faster.
-    if (paths.size <= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      return paths.map { path =>
-        val leafFiles = listLeafFiles(
-          path,
-          hadoopConf,
-          filter,
-          Some(sparkSession),
-          ignoreMissingFiles = ignoreMissingFiles,
-          ignoreLocality = ignoreLocality,
-          isRootPath = areRootPaths)
-        (path, leafFiles)
-      }
-    }
-
-    logInfo(s"Listing leaf files and directories in parallel under 
${paths.length} paths." +
-      s" The first several paths are: ${paths.take(10).mkString(", ")}.")
-    HiveCatalogMetrics.incrementParallelListingJobCount(1)
-
-    val sparkContext = sparkSession.sparkContext
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-    val serializedPaths = paths.map(_.toString)
-    val parallelPartitionDiscoveryParallelism =
-      sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
-
-    // Set the number of parallelism to prevent following file listing from 
generating many tasks
-    // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 
parallelPartitionDiscoveryParallelism)
-
-    val previousJobDescription = 
sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
-    val statusMap = try {
-      val description = paths.size match {
-        case 0 =>
-          s"Listing leaf files and directories 0 paths"
-        case 1 =>
-          s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
-        case s =>
-          s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, 
..."
-      }
-      sparkContext.setJobDescription(description)
-      sparkContext
-        .parallelize(serializedPaths, numParallelism)
-        .mapPartitions { pathStrings =>
-          val hadoopConf = serializableConfiguration.value
-          pathStrings.map(new Path(_)).toSeq.map { path =>
-            val leafFiles = listLeafFiles(
-              path,
-              hadoopConf,
-              filter,
-              None,
-              ignoreMissingFiles = ignoreMissingFiles,
-              ignoreLocality = ignoreLocality,
-              isRootPath = areRootPaths)
-            (path, leafFiles)
-          }.iterator
-        }.map { case (path, statuses) =>
-        val serializableStatuses = statuses.map { status =>
-          // Turn FileStatus into SerializableFileStatus so we can send it 
back to the driver
-          val blockLocations = status match {
-            case f: LocatedFileStatus =>
-              f.getBlockLocations.map { loc =>
-                SerializableBlockLocation(
-                  loc.getNames,
-                  loc.getHosts,
-                  loc.getOffset,
-                  loc.getLength)
-              }
-
-            case _ =>
-              Array.empty[SerializableBlockLocation]
-          }
-
-          SerializableFileStatus(
-            status.getPath.toString,
-            status.getLen,
-            status.isDirectory,
-            status.getReplication,
-            status.getBlockSize,
-            status.getModificationTime,
-            status.getAccessTime,
-            blockLocations)
-        }
-        (path.toString, serializableStatuses)
-      }.collect()
-    } finally {
-      sparkContext.setJobDescription(previousJobDescription)
-    }
-
-    // turn SerializableFileStatus back to Status
-    statusMap.map { case (path, serializableStatuses) =>
-      val statuses = serializableStatuses.map { f =>
-        val blockLocations = f.blockLocations.map { loc =>
-          new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
-        }
-        new LocatedFileStatus(
-          new FileStatus(
-            f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime,
-            new Path(f.path)),
-          blockLocations)
-      }
-      (new Path(path), statuses)
-    }
-  }
-
-  /**
-   * Lists a single filesystem path recursively. If a SparkSession object is 
specified, this
-   * function may launch Spark jobs to parallelize listing.
-   *
-   * If sessionOpt is None, this may be called on executors.
-   *
-   * @return all children of path that match the specified filter.
-   */
-  private def listLeafFiles(
-      path: Path,
-      hadoopConf: Configuration,
-      filter: PathFilter,
-      sessionOpt: Option[SparkSession],
-      ignoreMissingFiles: Boolean,
-      ignoreLocality: Boolean,
-      isRootPath: Boolean): Seq[FileStatus] = {
-    logTrace(s"Listing $path")
-    val fs = path.getFileSystem(hadoopConf)
-
-    // Note that statuses only include FileStatus for the files and dirs 
directly under path,
-    // and does not include anything else recursively.
-    val statuses: Array[FileStatus] = try {
-      fs match {
-        // DistributedFileSystem overrides listLocatedStatus to make 1 single 
call to namenode
-        // to retrieve the file status with the file block location. The 
reason to still fallback
-        // to listStatus is because the default implementation would 
potentially throw a
-        // FileNotFoundException which is better handled by doing the lookups 
manually below.
-        case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality 
=>
-          val remoteIter = fs.listLocatedStatus(path)
-          new Iterator[LocatedFileStatus]() {
-            def next(): LocatedFileStatus = remoteIter.next
-            def hasNext(): Boolean = remoteIter.hasNext
-          }.toArray
-        case _ => fs.listStatus(path)
-      }
-    } catch {
-      // If we are listing a root path (e.g. a top level directory of a 
table), we need to
-      // ignore FileNotFoundExceptions during this root level of the listing 
because
-      //
-      //  (a) certain code paths might construct an InMemoryFileIndex with 
root paths that
-      //      might not exist (i.e. not all callers are guaranteed to have 
checked
-      //      path existence prior to constructing InMemoryFileIndex) and,
-      //  (b) we need to ignore deleted root paths during REFRESH TABLE, 
otherwise we break
-      //      existing behavior and break the ability drop SessionCatalog 
tables when tables'
-      //      root directories have been deleted (which breaks a number of 
Spark's own tests).
-      //
-      // If we are NOT listing a root path then a FileNotFoundException here 
means that the
-      // directory was present in a previous level of file listing but is 
absent in this
-      // listing, likely indicating a race condition (e.g. concurrent table 
overwrite or S3
-      // list inconsistency).
-      //
-      // The trade-off in supporting existing behaviors / use-cases is that we 
won't be
-      // able to detect race conditions involving root paths being deleted 
during
-      // InMemoryFileIndex construction. However, it's still a net improvement 
to detect and
-      // fail-fast on the non-root cases. For more info see the SPARK-27676 
review discussion.
-      case _: FileNotFoundException if isRootPath || ignoreMissingFiles =>
-        logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
-        Array.empty[FileStatus]
-    }
-
-    val filteredStatuses = statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName))
-
-    val allLeafStatuses = {
-      val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
-      val nestedFiles: Seq[FileStatus] = sessionOpt match {
-        case Some(session) =>
-          bulkListLeafFiles(
-            dirs.map(_.getPath),
-            hadoopConf,
-            filter,
-            session,
-            areRootPaths = false
-          ).flatMap(_._2)
-        case _ =>
-          dirs.flatMap { dir =>
-            listLeafFiles(
-              dir.getPath,
-              hadoopConf,
-              filter,
-              sessionOpt,
-              ignoreMissingFiles = ignoreMissingFiles,
-              ignoreLocality = ignoreLocality,
-              isRootPath = false)
-          }
-      }
-      val allFiles = topLevelFiles ++ nestedFiles
-      if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else 
allFiles
-    }
-
-    val missingFiles = mutable.ArrayBuffer.empty[String]
-    val filteredLeafStatuses = allLeafStatuses.filterNot(
-      status => shouldFilterOut(status.getPath.getName))
-    val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
-      case f: LocatedFileStatus =>
-        Some(f)
-
-      // NOTE:
-      //
-      // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
-      //   operations, calling `getFileBlockLocations` does no harm here since 
these file system
-      //   implementations don't actually issue RPC for this method.
-      //
-      // - Here we are calling `getFileBlockLocations` in a sequential manner, 
but it should not
-      //   be a big deal since we always use to `bulkListLeafFiles` when the 
number of
-      //   paths exceeds threshold.
-      case f if !ignoreLocality =>
-        // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
-        // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
-        // subprocess and parse the stdout).
-        try {
-          val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
-            // Store BlockLocation objects to consume less memory
-            if (loc.getClass == classOf[BlockLocation]) {
-              loc
-            } else {
-              new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, 
loc.getLength)
-            }
-          }
-          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-            f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-          if (f.isSymlink) {
-            lfs.setSymlink(f.getSymlink)
-          }
-          Some(lfs)
-        } catch {
-          case _: FileNotFoundException if ignoreMissingFiles =>
-            missingFiles += f.getPath.toString
-            None
-        }
-
-      case f => Some(f)
-    }
-
-    if (missingFiles.nonEmpty) {
-      logWarning(
-        s"the following files were missing during file scan:\n  
${missingFiles.mkString("\n  ")}")
-    }
-
-    resolvedLeafStatuses
-  }
+      isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = {
+    HadoopFSUtils.parallelListLeafFiles(
+      sc = sparkSession.sparkContext,
+      paths = paths,
+      hadoopConf = hadoopConf,
+      filter = filter,
+      isRootLevel = isRootLevel,
+      ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
+      ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
+      parallelismThreshold = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
+      parallelismMax = 
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism,
+      filterFun = Some(shouldFilterOut))
+ }
 
   /** Checks if we should filter out this path name. */
   def shouldFilterOut(pathName: String): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to