This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 56bcbc026 [CELEBORN-1046] Add an expiration time configuration for app
directory to clean up
56bcbc026 is described below
commit 56bcbc026bb3a4655a777d8fc84fa80285a772a9
Author: sunjunjie <[email protected]>
AuthorDate: Tue Oct 17 19:23:49 2023 +0800
[CELEBORN-1046] Add an expiration time configuration for app directory to
clean up
### What changes were proposed in this pull request?
Add a configuration "celeborn.worker.storage.expireDirs.timeout" with a
default value of 6h in rsswork. This configuration is used to set the
expiration time for app local directories.
https://issues.apache.org/jira/browse/CELEBORN-1046
### Why are the changes needed?
When Celeborn periodically deletes the directories of apps, it determines
whether the app needs to be deleted based on the shuffleKeySet in memory.
However, this method may not accurately indicate the completion of the app and
could potentially lead to the unintentional deletion of shuffle data.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1998 from wilsonjie/CELEBORN-1046.
Authored-by: sunjunjie <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 03498ce46b0900f7565c717565a6637e2d3341bb)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/worker.md | 1 +
.../service/deploy/worker/storage/StorageManager.scala | 10 ++++++----
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 60a5f7f58..249cce8f7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -907,6 +907,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
+ def workerStorageExpireDirTimeout: Long =
get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT)
def creditStreamThreadsPerMountpoint: Int =
get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
def workerDirectMemoryRatioForReadBuffer: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
@@ -2050,6 +2051,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(16)
+ val WORKER_STORAGE_EXPIRE_DIR_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.storage.expireDirs.timeout")
+ .categories("worker")
+ .version("0.3.2")
+ .doc(s"The timeout for a expire dirs to be deleted on disk.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1h")
+
val HDFS_DIR: OptionalConfigEntry[String] =
buildConf("celeborn.storage.hdfs.dir")
.categories("worker", "master", "client")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 3cc87e5c4..1afaba6e3 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -99,6 +99,7 @@ license: |
| celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per
retry for a worker to check if the working directory is cleaned up before
registering with the master. | 0.3.0 |
| celeborn.worker.storage.dirs | <undefined> | Directory list to store
shuffle data. It's recommended to configure one directory on each disk. Storage
size limit can be set for each directory. For the sake of performance, there
should be no more than 2 flush threads on the same disk partition if you are
using HDD, and should be 8 or more flush threads on the same disk partition if
you are using SSD. For example:
`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktyp [...]
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved
space for each disk. | 0.3.0 |
+| celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire
dirs to be deleted on disk. | 0.3.2 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's
working dir path name. | 0.3.0 |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to
close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file
writer to create if its creation was failed. | 0.2.0 |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 1147d1b34..c818a238a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -55,6 +55,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hasHDFSStorage = conf.hasHDFSStorage
+ val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
+
// (deviceName -> deviceInfo) and (mount point -> diskInfo)
val (deviceInfos, diskInfos) = {
val workingDirInfos =
@@ -227,7 +229,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
saveCommittedFileInfoInterval,
TimeUnit.MILLISECONDS)
}
- cleanupExpiredAppDirs()
+ cleanupExpiredAppDirs(System.currentTimeMillis() - storageExpireDirTimeout)
if (!checkIfWorkingDirCleaned) {
logWarning(
"Worker still has residual files in the working directory before
registering with Master, " +
@@ -588,7 +590,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
override def run(): Unit = {
try {
// Clean up dirs which it's application is expired.
- cleanupExpiredAppDirs()
+ cleanupExpiredAppDirs(System.currentTimeMillis() -
storageExpireDirTimeout)
} catch {
case exception: Exception =>
logWarning(s"Cleanup expired shuffle data exception:
${exception.getMessage}")
@@ -599,7 +601,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
30,
TimeUnit.MINUTES)
- private def cleanupExpiredAppDirs(): Unit = {
+ private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
val diskInfoAndAppDirs = disksSnapshot()
.filter(_.status != DiskStatus.IO_HANG)
.map { case diskInfo =>
@@ -610,7 +612,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
appDirs.foreach { appDir =>
// Don't delete shuffleKey's data that exist correct shuffle file info.
- if (!appIds.contains(appDir.getName)) {
+ if (!appIds.contains(appDir.getName) && appDir.lastModified() <
expireDuration) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")