This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new af1c00856 [CELEBORN-2271] 
StorageManager#saveCommittedFileInfosExecutor should call shutdown before 
awaitTermination
af1c00856 is described below

commit af1c008562eafa51ae4882eb54467b8a55c0c685
Author: Prateek Srivastava <[email protected]>
AuthorDate: Thu Feb 26 11:15:53 2026 +0800

    [CELEBORN-2271] StorageManager#saveCommittedFileInfosExecutor should call 
shutdown before awaitTermination
    
    ### What changes were proposed in this pull request?
    Call saveCommittedFileInfosExecutor.shutdown() before awaitTermination() in 
saveAllCommittedFileInfosToDB() so the executor shuts down correctly during 
worker shutdown.
    
    ### Why are the changes needed?
    awaitTermination() only waits for the executor to finish after a shutdown 
has been requested; without shutdown(), the executor keeps running and can 
schedule more work.
    
    From [ExecutorService 
documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-):
    
    > Blocks until all tasks have completed execution "after a shutdown 
request", ...
    
    ### Does this PR resolve a correctness bug?
    
    Possibly, as it could lead to race conditions writing to RocksDB during 
shutdown, which could cause data loss or correctness issues.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Should be exercised by  existing tests to ensure this doesn't introduce a 
regression.
    
    Closes #3607 from f2prateek/fix-shutdown.
    
    Authored-by: Prateek Srivastava <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 4d97a8560a3bf5a839c14282a111eaf54bdac35f)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/service/deploy/worker/storage/StorageManager.scala   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 170cc124e..f4831bf15 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -368,6 +368,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   }
 
   def saveAllCommittedFileInfosToDB(): Unit = {
+    saveCommittedFileInfosExecutor.shutdown()
     // save committed fileinfo to DB should be done within the time of 
saveCommittedFileInfoInterval
     
saveCommittedFileInfosExecutor.awaitTermination(saveCommittedFileInfoInterval, 
MILLISECONDS)
     // graceful shutdown might be timed out, persist all committed fileinfos 
to DB

Reply via email to