This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1bf9d30f3 [CELEBORN-2161] Support DB delete failure policy
1bf9d30f3 is described below
commit 1bf9d30f319dc160639fa96715b10c07e97f0785
Author: sychen <[email protected]>
AuthorDate: Mon Sep 29 15:50:41 2025 +0800
[CELEBORN-2161] Support DB delete failure policy
### What changes were proposed in this pull request?
Introduce the configuration
`celeborn.worker.graceful.shutdown.dbDeleteFailurePolicy`. When configuring
`EXIT`, we can exit RocksDB delete failure gracefully.
### Why are the changes needed?
StorageManager#cleanupExpiredShuffleKey may cause some shuffle files to be
not cleaned up due to RocksDB delete exception, so that the worker data
directory is full. This can only be cleaned up by restarting the worker.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #3488 from cxzl25/CELEBORN-2161.
Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 13 +++++++++++++
docs/configuration/worker.md | 1 +
.../deploy/worker/storage/StorageManager.scala | 22 +++++++++++++++++++++-
3 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b3afc6179..ff296a98a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1357,6 +1357,8 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)
+ def workerGracefulShutdownDbDeleteFailurePolicy: String =
+ get(WORKER_GRACEFUL_SHUTDOWN_DB_DELETE_FAILURE_POLICY)
// //////////////////////////////////////////////////////
// Flusher //
@@ -3929,6 +3931,17 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val WORKER_GRACEFUL_SHUTDOWN_DB_DELETE_FAILURE_POLICY: ConfigEntry[String] =
+ buildConf("celeborn.worker.graceful.shutdown.dbDeleteFailurePolicy")
+ .categories("worker")
+ .doc("Policy for handling DB delete failures during graceful shutdown. "
+
+ "THROW: throw exception, EXIT: trigger graceful shutdown, IGNORE: log
error and continue (default).")
+ .version("0.7.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set("THROW", "EXIT", "IGNORE"))
+ .createWithDefault("IGNORE")
+
val WORKER_DISKTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.diskTime.slidingWindow.size")
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index b2343b55b..f4f4744d4 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -99,6 +99,7 @@ license: |
| celeborn.worker.flusher.threads | 16 | false | Flusher's thread count per
disk for unknown-type disks. | 0.2.0 | |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | false |
The wait interval of checking whether all released slots to be committed or
destroyed during worker graceful shutdown | 0.2.0 | |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | false
| The wait time of waiting for the released slots to be committed or destroyed
during worker graceful shutdown. | 0.2.0 | |
+| celeborn.worker.graceful.shutdown.dbDeleteFailurePolicy | IGNORE | false |
Policy for handling DB delete failures during graceful shutdown. THROW: throw
exception, EXIT: trigger graceful shutdown, IGNORE: log error and continue
(default). | 0.7.0 | |
| celeborn.worker.graceful.shutdown.enabled | false | false | When true,
during worker shutdown, the worker will wait for all released slots to be
committed or destroyed. | 0.2.0 | |
| celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s |
false | The wait time of waiting for sorting partition files during worker
graceful shutdown. | 0.2.0 | |
| celeborn.worker.graceful.shutdown.recoverDbBackend | ROCKSDB | false |
Specifies a disk-based store used in local db. ROCKSDB or LEVELDB (deprecated).
| 0.4.0 | |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index f2b045dd4..43eb3ca17 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -282,6 +282,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
conf.workerGracefulShutdownSaveCommittedFileInfoSync
private val saveCommittedFileInfoInterval =
conf.workerGracefulShutdownSaveCommittedFileInfoInterval
+ private val dbDeleteFailurePolicy =
+ conf.workerGracefulShutdownDbDeleteFailurePolicy
private val committedFileInfos =
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String,
DiskFileInfo]]()
// ShuffleClient can fetch data from a restarted worker only
@@ -674,7 +676,13 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (workerGracefulShutdown) {
committedFileInfos.remove(shuffleKey)
if (cleanDB) {
- db.delete(dbShuffleKey(shuffleKey))
+ try {
+ db.delete(dbShuffleKey(shuffleKey))
+ } catch {
+ case e: Exception =>
+ logError(s"Failed to delete DB entry for shuffle key
$shuffleKey", e)
+ handleDbDeleteFailure(e, shuffleKey)
+ }
}
}
}
@@ -1230,6 +1238,18 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
mapFileMeta
}
}
+
+ private def handleDbDeleteFailure(exception: Exception, shuffleKey: String):
Unit = {
+ dbDeleteFailurePolicy match {
+ case "EXIT" =>
+ logError("Triggering graceful shutdown due to DB delete failure")
+ System.exit(-1)
+ case "IGNORE" =>
+ logWarning(s"DB delete failed for shuffle key $shuffleKey, but
ignoring error and continuing execution")
+ case "THROW" | _ =>
+ throw exception
+ }
+ }
}
object StorageManager {