This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c6e68fddf [CELEBORN-2053] Refactor remote storage configration usage
c6e68fddf is described below
commit c6e68fddfa86e8d010a7f26f8ed0e98d30d3bfd0
Author: Kalvin2077 <[email protected]>
AuthorDate: Mon Jul 28 16:56:32 2025 +0800
[CELEBORN-2053] Refactor remote storage configration usage
### What changes were proposed in this pull request?
Refactoring similar code about configuration usage.
### Why are the changes needed?
Improve scalability for possible new remote storage in the future.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes #3353 from Kalvin2077/draft.
Authored-by: Kalvin2077 <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 27 +++++++++++++++++++++-
.../celeborn/common/util/CelebornHadoopUtils.scala | 21 ++++++++---------
.../apache/celeborn/common/CelebornConfSuite.scala | 2 +-
.../master/clustermeta/AbstractMetaManager.java | 13 +++--------
.../celeborn/service/deploy/master/Master.scala | 14 +++++------
.../service/deploy/worker/Controller.scala | 3 ++-
.../celeborn/service/deploy/worker/Worker.scala | 2 +-
.../deploy/worker/storage/StorageManager.scala | 26 ++++++++-------------
8 files changed, 57 insertions(+), 51 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 21f98745d..9d7184c96 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1195,7 +1195,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
- if (!hasHDFSStorage && !hasS3Storage && !hasOssStorage) {
+ if (remoteStorageDirs.isEmpty) {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
val diskType = Type.valueOf(workerStorageBaseDirDiskType)
@@ -1264,6 +1264,31 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
}.getOrElse("")
}
+ def remoteStorageDirs: Option[Set[(StorageInfo.Type, String)]] = {
+ val supported = Seq(
+ (StorageInfo.Type.HDFS, HDFS_DIR, Utils.isHdfsPath _),
+ (StorageInfo.Type.S3, S3_DIR, Utils.isS3Path _),
+ (StorageInfo.Type.OSS, OSS_DIR, Utils.isOssPath _))
+
+ val activeStorageTypes =
+ get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf).toList
+
+ val validDirs = supported.flatMap { case (ty, e, checker) =>
+ if (!activeStorageTypes.contains(ty)) None
+ else {
+ get(e).flatMap { dir =>
+ if (checker(dir)) Some((ty, dir))
+ else {
+ log.error(s"${e.key} configuration is invalid: $dir. Disabling $ty
support.")
+ None
+ }
+ }
+ }
+ }.toSet
+
+ if (validDirs.nonEmpty) Some(validDirs) else None
+ }
+
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
def workerStorageBaseDirDiskType: String =
get(WORKER_STORAGE_BASE_DIR_DISK_TYPE)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 047012ef6..2940afcd2 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -94,18 +94,15 @@ object CelebornHadoopUtils extends Logging {
val hadoopConf = newConfiguration(conf)
initKerberos(conf, hadoopConf)
val hadoopFs = new java.util.HashMap[StorageInfo.Type, FileSystem]()
- if (conf.hasHDFSStorage) {
- val hdfsDir = new Path(conf.hdfsDir)
- hadoopFs.put(StorageInfo.Type.HDFS, hdfsDir.getFileSystem(hadoopConf))
- }
- if (conf.hasS3Storage) {
- val s3Dir = new Path(conf.s3Dir)
- hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf))
- }
- if (conf.hasOssStorage) {
- val ossDir = new Path(conf.ossDir)
- hadoopFs.put(StorageInfo.Type.OSS, ossDir.getFileSystem(hadoopConf))
- }
+
+ conf.remoteStorageDirs.foreach(dirs =>
+ dirs.foreach {
+ case (storageType, dir) => {
+ val path = new Path(dir)
+ hadoopFs.put(storageType, path.getFileSystem(hadoopConf))
+ }
+ })
+
hadoopFs
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 2fedcdb32..ff2584ac6 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -207,7 +207,7 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(!conf.workerBaseDirs.isEmpty)
conf.set("celeborn.storage.availableTypes", "S3")
- conf.set("celeborn.storage.s3.dir", "s3a:///xxx")
+ conf.set("celeborn.storage.s3.dir", "s3a://xxx")
assert(conf.workerBaseDirs.isEmpty)
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index a56b8beb0..361bcbe08 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -287,23 +287,16 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
long unhealthyDiskNum =
disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
+ boolean remoteStorageDirsDefined = conf.remoteStorageDirs().isDefined();
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || exceed)
- && !conf.hasHDFSStorage()
- && !conf.hasS3Storage()
- && !conf.hasOssStorage())
- || highWorkload)) {
+ && (((disks.isEmpty() || exceed) && !remoteStorageDirsDefined) ||
highWorkload)) {
LOG.warn(
"Worker {} (unhealthy disks num: {}, high workload: {}) adds to
excluded workers",
worker,
unhealthyDiskNum,
highWorkload);
excludedWorkers.add(worker);
- } else if ((availableSlots.get() > 0
- || conf.hasHDFSStorage()
- || conf.hasS3Storage()
- || conf.hasOssStorage())
- && !highWorkload) {
+ } else if ((availableSlots.get() > 0 || remoteStorageDirsDefined) &&
!highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 00fbd220e..9446e6368 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -188,9 +188,7 @@ private[celeborn] class Master(
private val denyWorkerHostPattern = conf.denyWorkerHostPattern
private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS
- private val hasHDFSStorage = conf.hasHDFSStorage
- private val hasS3Storage = conf.hasS3Storage
- private val hasOssStorage = conf.hasOssStorage
+ private val remoteStorageDirs = conf.remoteStorageDirs
private val quotaManager = new QuotaManager(
statusSystem,
@@ -349,7 +347,7 @@ private[celeborn] class Master(
CheckForWorkerUnavailableInfoTimeout)
}
- if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
+ if (remoteStorageDirs.isDefined) {
checkForDFSRemnantDirsTimeOutTask =
scheduleCheckTask(dfsExpireDirsTimeoutMS,
CheckForDFSExpiredDirsTimeout)
}
@@ -1106,7 +1104,7 @@ private[celeborn] class Master(
statusSystem.handleAppLost(appId, requestId)
quotaManager.handleAppLost(appId)
logInfo(s"Removed application $appId")
- if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
+ if (remoteStorageDirs.isDefined) {
checkAndCleanExpiredAppDirsOnDFS(appId)
}
if (context != null) {
@@ -1126,9 +1124,9 @@ private[celeborn] class Master(
throw e
}
}
- if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir)
- if (hasS3Storage) processDir(conf.s3Dir, expiredDir)
- if (hasOssStorage) processDir(conf.ossDir, expiredDir)
+ remoteStorageDirs.foreach(_.foreach {
+ case (_, dir) => processDir(dir, expiredDir)
+ })
}
private def processDir(dfsDir: String, expiredDir: String): Unit = {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f01792729..f4327cf5a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -70,6 +70,7 @@ private[deploy] class Controller(
val mockCommitFilesFailure = conf.testMockCommitFilesFailure
val shuffleCommitTimeout = conf.workerShuffleCommitTimeout
val workerCommitFilesCheckInterval = conf.workerCommitFilesCheckInterval
+ val remoteStorageDirs = conf.remoteStorageDirs
def init(worker: Worker): Unit = {
storageManager = worker.storageManager
@@ -186,7 +187,7 @@ private[deploy] class Controller(
return
}
- if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage
&& !conf.hasS3Storage && !conf.hasOssStorage) {
+ if (storageManager.healthyWorkingDirs().size <= 0 &&
remoteStorageDirs.isEmpty) {
val msg = "Local storage has no available dirs!"
logError(s"[handleReserveSlots] $msg")
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR,
msg))
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 79928df44..fe70fab67 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -489,7 +489,7 @@ private[celeborn] class Worker(
val diskInfos =
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map {
disk =>
disk.mountPoint -> disk
- }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++
storageManager.s3DiskInfo ++ storageManager.ossDiskInfo
+ }.toMap.asJava).values().asScala.toSeq ++
storageManager.remoteDiskInfos.getOrElse(Set.empty)
workerStatusManager.checkIfNeedTransitionStatus()
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b71110947..bf3791343 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -68,10 +68,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String,
MemoryFileInfo]]()
val hasHDFSStorage = conf.hasHDFSStorage
-
val hasS3Storage = conf.hasS3Storage
-
val hasOssStorage = conf.hasOssStorage
+ val remoteStorageDirs = conf.remoteStorageDirs
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
val storagePolicy = new StoragePolicy(conf, this, workerSource)
@@ -86,27 +85,20 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread,
storageType)
}
- if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage &&
!hasOssStorage) {
+ if (workingDirInfos.size <= 0 && remoteStorageDirs.isEmpty) {
throw new IOException("Empty working directory configuration!")
}
DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf)
}
val mountPoints = new util.HashSet[String](diskInfos.keySet())
- val hdfsDiskInfo =
- if (conf.hasHDFSStorage)
- Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.HDFS))
- else None
-
- val s3DiskInfo =
- if (conf.hasS3Storage)
- Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.S3))
- else None
- val ossDiskInfo =
- if (conf.hasOssStorage)
- Option(new DiskInfo("OSS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.OSS))
- else None
+ val remoteDiskInfos: Option[Set[DiskInfo]] = remoteStorageDirs.flatMap {
dirs =>
+ val diskInfoSet = dirs.map { case (storageInfoType, _) =>
+ new DiskInfo(storageInfoType.name, Long.MaxValue, 999999, 999999, 0,
storageInfoType)
+ }
+ if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
+ }
def disksSnapshot(): List[DiskInfo] = {
diskInfos.synchronized {
@@ -449,7 +441,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
userIdentifier: UserIdentifier,
partitionSplitEnabled: Boolean,
isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
- if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage &&
!hasOssStorage) {
+ if (healthyWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
throw new IOException("No available working dirs!")
}
val partitionDataWriterContext = new PartitionDataWriterContext(