This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c6e68fddf [CELEBORN-2053] Refactor remote storage configration usage
c6e68fddf is described below

commit c6e68fddfa86e8d010a7f26f8ed0e98d30d3bfd0
Author: Kalvin2077 <[email protected]>
AuthorDate: Mon Jul 28 16:56:32 2025 +0800

    [CELEBORN-2053] Refactor remote storage configration usage
    
    ### What changes were proposed in this pull request?
    
    Refactoring similar code about configuration usage.
    
    ### Why are the changes needed?
    
    Improve scalability for possible new remote storage in the future.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests.
    
    Closes #3353 from Kalvin2077/draft.
    
    Authored-by: Kalvin2077 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 27 +++++++++++++++++++++-
 .../celeborn/common/util/CelebornHadoopUtils.scala | 21 ++++++++---------
 .../apache/celeborn/common/CelebornConfSuite.scala |  2 +-
 .../master/clustermeta/AbstractMetaManager.java    | 13 +++--------
 .../celeborn/service/deploy/master/Master.scala    | 14 +++++------
 .../service/deploy/worker/Controller.scala         |  3 ++-
 .../celeborn/service/deploy/worker/Worker.scala    |  2 +-
 .../deploy/worker/storage/StorageManager.scala     | 26 ++++++++-------------
 8 files changed, 57 insertions(+), 51 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 21f98745d..9d7184c96 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1195,7 +1195,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
         (dir, maxCapacity, flushThread, diskType)
       }
     }.getOrElse {
-      if (!hasHDFSStorage && !hasS3Storage && !hasOssStorage) {
+      if (remoteStorageDirs.isEmpty) {
         val prefix = workerStorageBaseDirPrefix
         val number = workerStorageBaseDirNumber
         val diskType = Type.valueOf(workerStorageBaseDirDiskType)
@@ -1264,6 +1264,31 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     }.getOrElse("")
   }
 
+  def remoteStorageDirs: Option[Set[(StorageInfo.Type, String)]] = {
+    val supported = Seq(
+      (StorageInfo.Type.HDFS, HDFS_DIR, Utils.isHdfsPath _),
+      (StorageInfo.Type.S3, S3_DIR, Utils.isS3Path _),
+      (StorageInfo.Type.OSS, OSS_DIR, Utils.isOssPath _))
+
+    val activeStorageTypes =
+      get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf).toList
+
+    val validDirs = supported.flatMap { case (ty, e, checker) =>
+      if (!activeStorageTypes.contains(ty)) None
+      else {
+        get(e).flatMap { dir =>
+          if (checker(dir)) Some((ty, dir))
+          else {
+            log.error(s"${e.key} configuration is invalid: $dir. Disabling $ty 
support.")
+            None
+          }
+        }
+      }
+    }.toSet
+
+    if (validDirs.nonEmpty) Some(validDirs) else None
+  }
+
   def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
   def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
   def workerStorageBaseDirDiskType: String = 
get(WORKER_STORAGE_BASE_DIR_DISK_TYPE)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 047012ef6..2940afcd2 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -94,18 +94,15 @@ object CelebornHadoopUtils extends Logging {
     val hadoopConf = newConfiguration(conf)
     initKerberos(conf, hadoopConf)
     val hadoopFs = new java.util.HashMap[StorageInfo.Type, FileSystem]()
-    if (conf.hasHDFSStorage) {
-      val hdfsDir = new Path(conf.hdfsDir)
-      hadoopFs.put(StorageInfo.Type.HDFS, hdfsDir.getFileSystem(hadoopConf))
-    }
-    if (conf.hasS3Storage) {
-      val s3Dir = new Path(conf.s3Dir)
-      hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf))
-    }
-    if (conf.hasOssStorage) {
-      val ossDir = new Path(conf.ossDir)
-      hadoopFs.put(StorageInfo.Type.OSS, ossDir.getFileSystem(hadoopConf))
-    }
+
+    conf.remoteStorageDirs.foreach(dirs =>
+      dirs.foreach {
+        case (storageType, dir) => {
+          val path = new Path(dir)
+          hadoopFs.put(storageType, path.getFileSystem(hadoopConf))
+        }
+      })
+
     hadoopFs
   }
 
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 2fedcdb32..ff2584ac6 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -207,7 +207,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(!conf.workerBaseDirs.isEmpty)
 
     conf.set("celeborn.storage.availableTypes", "S3")
-    conf.set("celeborn.storage.s3.dir", "s3a:///xxx")
+    conf.set("celeborn.storage.s3.dir", "s3a://xxx")
     assert(conf.workerBaseDirs.isEmpty)
   }
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index a56b8beb0..361bcbe08 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -287,23 +287,16 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     long unhealthyDiskNum =
         disks.values().stream().filter(s -> 
!s.status().equals(DiskStatus.HEALTHY)).count();
     boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >= 
unhealthyDiskRatioThreshold;
+    boolean remoteStorageDirsDefined = conf.remoteStorageDirs().isDefined();
     if (!excludedWorkers.contains(worker)
-        && (((disks.isEmpty() || exceed)
-                && !conf.hasHDFSStorage()
-                && !conf.hasS3Storage()
-                && !conf.hasOssStorage())
-            || highWorkload)) {
+        && (((disks.isEmpty() || exceed) && !remoteStorageDirsDefined) || 
highWorkload)) {
       LOG.warn(
           "Worker {} (unhealthy disks num: {}, high workload: {}) adds to 
excluded workers",
           worker,
           unhealthyDiskNum,
           highWorkload);
       excludedWorkers.add(worker);
-    } else if ((availableSlots.get() > 0
-            || conf.hasHDFSStorage()
-            || conf.hasS3Storage()
-            || conf.hasOssStorage())
-        && !highWorkload) {
+    } else if ((availableSlots.get() > 0 || remoteStorageDirsDefined) && 
!highWorkload) {
       // only unblack if numSlots larger than 0
       excludedWorkers.remove(worker);
     }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 00fbd220e..9446e6368 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -188,9 +188,7 @@ private[celeborn] class Master(
   private val denyWorkerHostPattern = conf.denyWorkerHostPattern
 
   private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS
-  private val hasHDFSStorage = conf.hasHDFSStorage
-  private val hasS3Storage = conf.hasS3Storage
-  private val hasOssStorage = conf.hasOssStorage
+  private val remoteStorageDirs = conf.remoteStorageDirs
 
   private val quotaManager = new QuotaManager(
     statusSystem,
@@ -349,7 +347,7 @@ private[celeborn] class Master(
         CheckForWorkerUnavailableInfoTimeout)
     }
 
-    if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
+    if (remoteStorageDirs.isDefined) {
       checkForDFSRemnantDirsTimeOutTask =
         scheduleCheckTask(dfsExpireDirsTimeoutMS, 
CheckForDFSExpiredDirsTimeout)
     }
@@ -1106,7 +1104,7 @@ private[celeborn] class Master(
         statusSystem.handleAppLost(appId, requestId)
         quotaManager.handleAppLost(appId)
         logInfo(s"Removed application $appId")
-        if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
+        if (remoteStorageDirs.isDefined) {
           checkAndCleanExpiredAppDirsOnDFS(appId)
         }
         if (context != null) {
@@ -1126,9 +1124,9 @@ private[celeborn] class Master(
           throw e
       }
     }
-    if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir)
-    if (hasS3Storage) processDir(conf.s3Dir, expiredDir)
-    if (hasOssStorage) processDir(conf.ossDir, expiredDir)
+    remoteStorageDirs.foreach(_.foreach {
+      case (_, dir) => processDir(dir, expiredDir)
+    })
   }
 
   private def processDir(dfsDir: String, expiredDir: String): Unit = {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f01792729..f4327cf5a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -70,6 +70,7 @@ private[deploy] class Controller(
   val mockCommitFilesFailure = conf.testMockCommitFilesFailure
   val shuffleCommitTimeout = conf.workerShuffleCommitTimeout
   val workerCommitFilesCheckInterval = conf.workerCommitFilesCheckInterval
+  val remoteStorageDirs = conf.remoteStorageDirs
 
   def init(worker: Worker): Unit = {
     storageManager = worker.storageManager
@@ -186,7 +187,7 @@ private[deploy] class Controller(
       return
     }
 
-    if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage 
&& !conf.hasS3Storage && !conf.hasOssStorage) {
+    if (storageManager.healthyWorkingDirs().size <= 0 && 
remoteStorageDirs.isEmpty) {
       val msg = "Local storage has no available dirs!"
       logError(s"[handleReserveSlots] $msg")
       context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, 
msg))
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 79928df44..fe70fab67 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -489,7 +489,7 @@ private[celeborn] class Worker(
     val diskInfos =
       workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { 
disk =>
         disk.mountPoint -> disk
-      }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++ 
storageManager.s3DiskInfo ++ storageManager.ossDiskInfo
+      }.toMap.asJava).values().asScala.toSeq ++ 
storageManager.remoteDiskInfos.getOrElse(Set.empty)
     workerStatusManager.checkIfNeedTransitionStatus()
     val response = masterClient.askSync[HeartbeatFromWorkerResponse](
       HeartbeatFromWorker(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b71110947..bf3791343 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -68,10 +68,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, 
MemoryFileInfo]]()
 
   val hasHDFSStorage = conf.hasHDFSStorage
-
   val hasS3Storage = conf.hasS3Storage
-
   val hasOssStorage = conf.hasOssStorage
+  val remoteStorageDirs = conf.remoteStorageDirs
 
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
   val storagePolicy = new StoragePolicy(conf, this, workerSource)
@@ -86,27 +85,20 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         (new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread, 
storageType)
       }
 
-    if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage && 
!hasOssStorage) {
+    if (workingDirInfos.size <= 0 && remoteStorageDirs.isEmpty) {
       throw new IOException("Empty working directory configuration!")
     }
 
     DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf)
   }
   val mountPoints = new util.HashSet[String](diskInfos.keySet())
-  val hdfsDiskInfo =
-    if (conf.hasHDFSStorage)
-      Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.HDFS))
-    else None
-
-  val s3DiskInfo =
-    if (conf.hasS3Storage)
-      Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.S3))
-    else None
 
-  val ossDiskInfo =
-    if (conf.hasOssStorage)
-      Option(new DiskInfo("OSS", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.OSS))
-    else None
+  val remoteDiskInfos: Option[Set[DiskInfo]] = remoteStorageDirs.flatMap { 
dirs =>
+    val diskInfoSet = dirs.map { case (storageInfoType, _) =>
+      new DiskInfo(storageInfoType.name, Long.MaxValue, 999999, 999999, 0, 
storageInfoType)
+    }
+    if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
+  }
 
   def disksSnapshot(): List[DiskInfo] = {
     diskInfos.synchronized {
@@ -449,7 +441,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       userIdentifier: UserIdentifier,
       partitionSplitEnabled: Boolean,
       isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
-    if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage && 
!hasOssStorage) {
+    if (healthyWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
       throw new IOException("No available working dirs!")
     }
     val partitionDataWriterContext = new PartitionDataWriterContext(

Reply via email to