This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new af1c00856 [CELEBORN-2271]
StorageManager#saveCommittedFileInfosExecutor should call shutdown before
awaitTermination
af1c00856 is described below
commit af1c008562eafa51ae4882eb54467b8a55c0c685
Author: Prateek Srivastava <[email protected]>
AuthorDate: Thu Feb 26 11:15:53 2026 +0800
[CELEBORN-2271] StorageManager#saveCommittedFileInfosExecutor should call
shutdown before awaitTermination
### What changes were proposed in this pull request?
Call saveCommittedFileInfosExecutor.shutdown() before awaitTermination() in
saveAllCommittedFileInfosToDB() so the executor shuts down correctly during
worker shutdown.
### Why are the changes needed?
awaitTermination() only waits for the executor to finish after a shutdown
has been requested; without shutdown(), the executor keeps running and can
schedule more work.
From [ExecutorService
documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-):
> Blocks until all tasks have completed execution "after a shutdown
request", ...
### Does this PR resolve a correctness bug?
Possibly, as it could lead to race conditions writing to RocksDB during
shutdown, which could cause data loss or correctness issues.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Should be exercised by existing tests to ensure this doesn't introduce a
regression.
Closes #3607 from f2prateek/fix-shutdown.
Authored-by: Prateek Srivastava <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 4d97a8560a3bf5a839c14282a111eaf54bdac35f)
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/service/deploy/worker/storage/StorageManager.scala | 1 +
1 file changed, 1 insertion(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 170cc124e..f4831bf15 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -368,6 +368,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
def saveAllCommittedFileInfosToDB(): Unit = {
+ saveCommittedFileInfosExecutor.shutdown()
// save committed fileinfo to DB should be done within the time of
saveCommittedFileInfoInterval
saveCommittedFileInfosExecutor.awaitTermination(saveCommittedFileInfoInterval,
MILLISECONDS)
// graceful shutdown might be timed out, persist all committed fileinfos
to DB