This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f0ce48609 [CELEBORN-2150] Fix the match condition in
checkIfWorkingDirCleaned
f0ce48609 is described below
commit f0ce486092744ffc1eaf4767da6c319e98e2771d
Author: xxx <[email protected]>
AuthorDate: Sat Sep 20 15:17:44 2025 +0800
[CELEBORN-2150] Fix the match condition in checkIfWorkingDirCleaned
### What changes were proposed in this pull request?
Fix the match condition in checkIfWorkingDirCleaned
### Why are the changes needed?
`checkIfWorkingDirCleaned` has wrong match condition, `dfsCleaned` always
return true.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes #3477 from xy2953396112/CELEBORN-2150.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index bc684a86b..f2b045dd4 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -767,21 +767,26 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
false
}
}
-
- val dfsCleaned = hadoopFs match {
- case dfs: FileSystem =>
- val dfsDir = if (hasHDFSStorage) hdfsDir else if (hasOssStorage)
ossDir else s3Dir
- val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
- // DFS path not exist when first time initialize
- if (dfs.exists(dfsWorkPath)) {
- !dfs.listFiles(dfsWorkPath, false).hasNext
- } else {
- true
+ val dfsCleaned = hadoopFs == null || hadoopFs.asScala.forall {
+ case (storageType, fs) =>
+ val dfsDir =
+ if (storageType == StorageInfo.Type.HDFS)
+ hdfsDir
+ else if (storageType == StorageInfo.Type.OSS) ossDir
+ else s3Dir
+ try {
+ val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
+ // DFS path not exist when first time initialize
+ !fs.exists(dfsWorkPath) || !fs.listFiles(dfsWorkPath,
false).hasNext
+ } catch {
+ case t: Throwable =>
+ // When DFS is not accessible, assume it's cleaned
+ logError("checkIfWorkingDirCleaned failed.", t)
+ true
}
case _ =>
true
}
-
if (localCleaned && dfsCleaned) {
return true
}