This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f0ce48609 [CELEBORN-2150] Fix the match condition in 
checkIfWorkingDirCleaned
f0ce48609 is described below

commit f0ce486092744ffc1eaf4767da6c319e98e2771d
Author: xxx <[email protected]>
AuthorDate: Sat Sep 20 15:17:44 2025 +0800

    [CELEBORN-2150] Fix the match condition in checkIfWorkingDirCleaned
    
    ### What changes were proposed in this pull request?
    
    Fix the match condition in checkIfWorkingDirCleaned
    
    ### Why are the changes needed?
    
    `checkIfWorkingDirCleaned` has wrong match condition, `dfsCleaned` always 
return true.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    CI
    
    Closes #3477 from xy2953396112/CELEBORN-2150.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala     | 27 +++++++++++++---------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index bc684a86b..f2b045dd4 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -767,21 +767,26 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
               false
           }
         }
-
-      val dfsCleaned = hadoopFs match {
-        case dfs: FileSystem =>
-          val dfsDir = if (hasHDFSStorage) hdfsDir else if (hasOssStorage) 
ossDir else s3Dir
-          val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
-          // DFS path not exist when first time initialize
-          if (dfs.exists(dfsWorkPath)) {
-            !dfs.listFiles(dfsWorkPath, false).hasNext
-          } else {
-            true
+      val dfsCleaned = hadoopFs == null || hadoopFs.asScala.forall {
+        case (storageType, fs) =>
+          val dfsDir =
+            if (storageType == StorageInfo.Type.HDFS)
+              hdfsDir
+            else if (storageType == StorageInfo.Type.OSS) ossDir
+            else s3Dir
+          try {
+            val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
+            // DFS path not exist when first time initialize
+            !fs.exists(dfsWorkPath) || !fs.listFiles(dfsWorkPath, 
false).hasNext
+          } catch {
+            case t: Throwable =>
+              // When DFS is not accessible, assume it's cleaned
+              logError("checkIfWorkingDirCleaned failed.", t)
+              true
           }
         case _ =>
           true
       }
-
       if (localCleaned && dfsCleaned) {
         return true
       }

Reply via email to