This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 03498ce46 [CELEBORN-1046] Add an expiration time configuration for app
directory to clean up
03498ce46 is described below
commit 03498ce46b0900f7565c717565a6637e2d3341bb
Author: sunjunjie <[email protected]>
AuthorDate: Tue Oct 17 19:23:49 2023 +0800
[CELEBORN-1046] Add an expiration time configuration for app directory to
clean up
### What changes were proposed in this pull request?
Add a configuration "celeborn.worker.storage.expireDirs.timeout" with a
default value of 6h in rsswork. This configuration is used to set the
expiration time for app local directories.
https://issues.apache.org/jira/browse/CELEBORN-1046
### Why are the changes needed?
When Celeborn periodically deletes the directories of apps, it determines
whether the app needs to be deleted based on the shuffleKeySet in memory.
However, this method may not accurately indicate the completion of the app and
could potentially lead to the unintentional deletion of shuffle data.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1998 from wilsonjie/CELEBORN-1046.
Authored-by: sunjunjie <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/worker.md | 1 +
.../service/deploy/worker/storage/StorageManager.scala | 10 ++++++----
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ae05a38ae..aa39db44f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -910,6 +910,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
+ def workerStorageExpireDirTimeout: Long =
get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT)
def creditStreamThreadsPerMountpoint: Int =
get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
def workerDirectMemoryRatioForReadBuffer: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
@@ -2083,6 +2084,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(16)
+ val WORKER_STORAGE_EXPIRE_DIR_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.storage.expireDirs.timeout")
+ .categories("worker")
+ .version("0.3.2")
+ .doc(s"The timeout for a expire dirs to be deleted on disk.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1h")
+
val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 21f0665c7..62c6e5a15 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -102,6 +102,7 @@ license: |
| celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per
retry for a worker to check if the working directory is cleaned up before
registering with the master. | 0.3.0 |
| celeborn.worker.storage.dirs | <undefined> | Directory list to store
shuffle data. It's recommended to configure one directory on each disk. Storage
size limit can be set for each directory. For the sake of performance, there
should be no more than 2 flush threads on the same disk partition if you are
using HDD, and should be 8 or more flush threads on the same disk partition if
you are using SSD. For example:
`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktyp [...]
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved
space for each disk. | 0.3.0 |
+| celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire
dirs to be deleted on disk. | 0.3.2 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's
working dir path name. | 0.3.0 |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to
close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file
writer to create if its creation was failed. | 0.2.0 |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b1f850f8d..c836e98e6 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -55,6 +55,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hasHDFSStorage = conf.hasHDFSStorage
+ val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
+
// (deviceName -> deviceInfo) and (mount point -> diskInfo)
val (deviceInfos, diskInfos) = {
val workingDirInfos =
@@ -230,7 +232,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
saveCommittedFileInfoInterval,
TimeUnit.MILLISECONDS)
}
- cleanupExpiredAppDirs()
+ cleanupExpiredAppDirs(System.currentTimeMillis() - storageExpireDirTimeout)
if (!checkIfWorkingDirCleaned) {
logWarning(
"Worker still has residual files in the working directory before
registering with Master, " +
@@ -591,7 +593,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
override def run(): Unit = {
try {
// Clean up dirs which it's application is expired.
- cleanupExpiredAppDirs()
+ cleanupExpiredAppDirs(System.currentTimeMillis() -
storageExpireDirTimeout)
} catch {
case exception: Exception =>
logWarning(s"Cleanup expired shuffle data exception:
${exception.getMessage}")
@@ -602,7 +604,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
30,
TimeUnit.MINUTES)
- private def cleanupExpiredAppDirs(): Unit = {
+ private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
val diskInfoAndAppDirs = disksSnapshot()
.filter(_.status != DiskStatus.IO_HANG)
.map { case diskInfo =>
@@ -613,7 +615,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
appDirs.foreach { appDir =>
// Don't delete shuffleKey's data that exist correct shuffle file info.
- if (!appIds.contains(appDir.getName)) {
+ if (!appIds.contains(appDir.getName) && appDir.lastModified() <
expireDuration) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")