This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new d8270cb35 [CELEBORN-2159] Fix dfs storage type check in 
StorageManager#cleanupExpiredShuffleKey
d8270cb35 is described below

commit d8270cb35e1ff32fed49929a96b174596bf0a957
Author: xxx <[email protected]>
AuthorDate: Tue Oct 14 10:23:33 2025 +0800

    [CELEBORN-2159] Fix dfs storage type check in 
StorageManager#cleanupExpiredShuffleKey
    
    ### What changes were proposed in this pull request?
    
    Fix dfs storage type check in `StorageManager#cleanupExpiredShuffleKey`.
    
    ### Why are the changes needed?
    
    When multiple DFS files need to be cleaned up, the values of isHdfs and 
isOss will be overwritten by the last processed file, rather than being 
determined based on the type of files that actually need cleanup. This may lead 
to cleaning up incorrect DFS storage paths.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3486 from xy2953396112/CELEBORN-2159.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 9482808c3180f4e057aa35c9e97c257095c00446)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala     | 27 ++++++++--------------
 1 file changed, 10 insertions(+), 17 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 636941f3f..170cc124e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -26,10 +26,11 @@ import java.util.concurrent.atomic.{AtomicInteger, 
AtomicLong}
 import java.util.function.{BiConsumer, IntUnaryOperator}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.concurrent.duration._
 
 import com.google.common.annotations.VisibleForTesting
-import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator}
+import io.netty.buffer.ByteBufAllocator
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
@@ -638,16 +639,12 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       logInfo(s"Cleanup expired shuffle $shuffleKey.")
       if (diskFileInfos.containsKey(shuffleKey)) {
         val removedFileInfos = diskFileInfos.remove(shuffleKey)
-        var isDfsExpired = false
-        var isHdfs = false
-        var isOss = false
+        val expireStorageTypes = mutable.Set[StorageInfo.Type]()
         if (removedFileInfos != null) {
           removedFileInfos.asScala.foreach {
             case (_, fileInfo) =>
               if (cleanFileInternal(shuffleKey, fileInfo)) {
-                isDfsExpired = true
-                isHdfs = fileInfo.isHdfs
-                isOss = fileInfo.isOSS
+                expireStorageTypes.add(fileInfo.getStorageType)
               }
           }
         }
@@ -660,23 +657,19 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
             deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
           }
         }
-        if (isDfsExpired) {
+        expireStorageTypes.foreach(storageType => {
           try {
             val dir =
-              if (hasHDFSStorage && isHdfs) hdfsDir
-              else if (hasOssStorage && isOss) ossDir
+              if (storageType == StorageInfo.Type.HDFS) hdfsDir
+              else if (storageType == StorageInfo.Type.OSS) ossDir
               else s3Dir
-            val storageInfo =
-              if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS
-              else if (hasOssStorage && isOss) StorageInfo.Type.OSS
-              else StorageInfo.Type.S3
-            StorageManager.hadoopFs.get(storageInfo).delete(
+            StorageManager.hadoopFs.get(storageType).delete(
               new Path(new Path(dir, conf.workerWorkingDir), 
s"$appId/$shuffleId"),
               true)
           } catch {
-            case e: Exception => logWarning("Clean expired DFS shuffle 
failed.", e)
+            case e: Exception => logWarning(s"Clean expired $storageType 
shuffle failed.", e)
           }
-        }
+        })
         if (workerGracefulShutdown) {
           committedFileInfos.remove(shuffleKey)
           if (cleanDB) {

Reply via email to