This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new d8270cb35 [CELEBORN-2159] Fix dfs storage type check in
StorageManager#cleanupExpiredShuffleKey
d8270cb35 is described below
commit d8270cb35e1ff32fed49929a96b174596bf0a957
Author: xxx <[email protected]>
AuthorDate: Tue Oct 14 10:23:33 2025 +0800
[CELEBORN-2159] Fix dfs storage type check in
StorageManager#cleanupExpiredShuffleKey
### What changes were proposed in this pull request?
Fix dfs storage type check in `StorageManager#cleanupExpiredShuffleKey`.
### Why are the changes needed?
When multiple DFS files need to be cleaned up, the values of isHdfs and
isOss will be overwritten by the last processed file, rather than being
determined based on the type of files that actually need cleanup. This may lead
to cleaning up incorrect DFS storage paths.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3486 from xy2953396112/CELEBORN-2159.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 9482808c3180f4e057aa35c9e97c257095c00446)
Signed-off-by: SteNicholas <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 27 ++++++++--------------
1 file changed, 10 insertions(+), 17 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 636941f3f..170cc124e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -26,10 +26,11 @@ import java.util.concurrent.atomic.{AtomicInteger,
AtomicLong}
import java.util.function.{BiConsumer, IntUnaryOperator}
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.concurrent.duration._
import com.google.common.annotations.VisibleForTesting
-import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator}
+import io.netty.buffer.ByteBufAllocator
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
@@ -638,16 +639,12 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
logInfo(s"Cleanup expired shuffle $shuffleKey.")
if (diskFileInfos.containsKey(shuffleKey)) {
val removedFileInfos = diskFileInfos.remove(shuffleKey)
- var isDfsExpired = false
- var isHdfs = false
- var isOss = false
+ val expireStorageTypes = mutable.Set[StorageInfo.Type]()
if (removedFileInfos != null) {
removedFileInfos.asScala.foreach {
case (_, fileInfo) =>
if (cleanFileInternal(shuffleKey, fileInfo)) {
- isDfsExpired = true
- isHdfs = fileInfo.isHdfs
- isOss = fileInfo.isOSS
+ expireStorageTypes.add(fileInfo.getStorageType)
}
}
}
@@ -660,23 +657,19 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
}
}
- if (isDfsExpired) {
+ expireStorageTypes.foreach(storageType => {
try {
val dir =
- if (hasHDFSStorage && isHdfs) hdfsDir
- else if (hasOssStorage && isOss) ossDir
+ if (storageType == StorageInfo.Type.HDFS) hdfsDir
+ else if (storageType == StorageInfo.Type.OSS) ossDir
else s3Dir
- val storageInfo =
- if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS
- else if (hasOssStorage && isOss) StorageInfo.Type.OSS
- else StorageInfo.Type.S3
- StorageManager.hadoopFs.get(storageInfo).delete(
+ StorageManager.hadoopFs.get(storageType).delete(
new Path(new Path(dir, conf.workerWorkingDir),
s"$appId/$shuffleId"),
true)
} catch {
- case e: Exception => logWarning("Clean expired DFS shuffle
failed.", e)
+ case e: Exception => logWarning(s"Clean expired $storageType
shuffle failed.", e)
}
- }
+ })
if (workerGracefulShutdown) {
committedFileInfos.remove(shuffleKey)
if (cleanDB) {