This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a0d27b10fb2 [SPARK-40058][CORE] Avoid filter file path more than once 
in HadoopFSUtils
a0d27b10fb2 is described below

commit a0d27b10fb21bdefe0cabacca303947cc46d282f
Author: guanziyue <30882822+guanzi...@users.noreply.github.com>
AuthorDate: Mon Aug 15 08:57:43 2022 -0500

    [SPARK-40058][CORE] Avoid filter file path more than once in HadoopFSUtils
    
    ### What changes were proposed in this pull request?
    Refactor path filter logic in HadoopFSUtils to avoid the same filter logic 
is applied to a file multiple time. Method listLeafFiles is called recursively. 
Especially, this filter will be used in single thread on all files at driver 
side. This will lead to a performance issue when the filter logic is heavy.
    
    ### Why are the changes needed?
    Apply filter only on filestatus as soon as they are firstly met.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No test was added as such change is simple enough.
    
    Closes #37498 from guanziyue/SPARK-40058.
    
    Authored-by: guanziyue <30882822+guanzi...@users.noreply.github.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala 
b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index 60a73adc858..01dc3ba68cc 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -254,7 +254,7 @@ private[spark] object HadoopFSUtils extends Logging {
 
     val allLeafStatuses = {
       val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
-      val nestedFiles: Seq[FileStatus] = contextOpt match {
+      val filteredNestedFiles: Seq[FileStatus] = contextOpt match {
         case Some(context) if dirs.size > parallelismThreshold =>
           parallelListLeafFilesInternal(
             context,
@@ -281,8 +281,12 @@ private[spark] object HadoopFSUtils extends Logging {
               parallelismMax = parallelismMax)
           }
       }
-      val allFiles = topLevelFiles ++ nestedFiles
-      if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else 
allFiles
+      val filteredTopLevelFiles = if (filter != null) {
+        topLevelFiles.filter(f => filter.accept(f.getPath))
+      } else {
+        topLevelFiles
+      }
+      filteredTopLevelFiles ++ filteredNestedFiles
     }
 
     val missingFiles = mutable.ArrayBuffer.empty[String]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to