This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fc556531 [CELEBORN-2291] Support fsync on commit to ensure shuffle 
data durability
6fc556531 is described below

commit 6fc5565319c6622aef8637a5cf5fa348fc763745
Author: Kartikay Bhutani <[email protected]>
AuthorDate: Fri Mar 27 10:35:55 2026 +0800

    [CELEBORN-2291] Support fsync on commit to ensure shuffle data durability
    
    ### What changes were proposed in this pull request?
      Add a new configuration `celeborn.worker.commitFiles.fsync` (default 
`false`) that calls `FileChannel.force(false)` (fdatasync) before closing the 
channel in
       `LocalTierWriter.closeStreams()`.
    
      ### Why are the changes needed?
    
      Without this, committed shuffle data can sit in the OS page cache before 
the kernel flushes it to disk. A hard crash in that window loses data even 
though Celeborn considers it committed. This option lets operators opt into 
stronger durability guarantees.
    
      ### Does this PR resolve a correctness bug?
    
      No. It adds an optional durability enhancement.
    
      ### Does this PR introduce _any_ user-facing change?
    
      Yes. New configuration key `celeborn.worker.commitFiles.fsync` (boolean, 
default `false`).
    
      ### How was this patch tested?
    
      Existing unit tests. Configuration verified via `ConfigurationSuite` and 
for LocalTierWriter added a new test with fsync enabled and ran 
`TierWriterSuite`.
    
    Additional context: 
[slack](https://apachecelebor-kw08030.slack.com/archives/C04B1FYS6SY/p1774259245973229)
    
    Closes #3635 from kaybhutani/kartikay/fsync-on-commit.
    
    Authored-by: Kartikay Bhutani <[email protected]>
    Signed-off-by: 子懿 <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++++
 docs/configuration/worker.md                       |  1 +
 .../service/deploy/worker/storage/TierWriter.scala | 11 ++++++++++-
 .../deploy/worker/storage/TierWriterSuite.scala    | 23 ++++++++++++++++++++--
 4 files changed, 43 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b1a4a469e..a71723c9a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -859,6 +859,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerCommitThreads: Int =
     if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else 
get(WORKER_COMMIT_THREADS)
   def workerCommitFilesCheckInterval: Long = 
get(WORKER_COMMIT_FILES_CHECK_INTERVAL)
+  def workerCommitFilesFsync: Boolean = get(WORKER_COMMIT_FILES_FSYNC)
   def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
   def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
   def maxPartitionSizeToEstimate: Long =
@@ -3770,6 +3771,16 @@ object CelebornConf extends Logging {
       .doc("Time length for a window about checking whether commit shuffle 
data files finished.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("100")
+  val WORKER_COMMIT_FILES_FSYNC: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.commitFiles.fsync")
+      .categories("worker")
+      .version("0.7.0")
+      .doc("Whether to fsync (fdatasync) shuffle data when committing. " +
+        "When enabled, each partition file is fsynced to disk before the 
commit completes " +
+        "ensuring committed data survives OS crashes, hard reboots etc. " +
+        "Enabling ensures durability but can add some latency to commit 
times.")
+      .booleanConf
+      .createWithDefault(false)
 
   val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.clean.threads")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 0adfabe67..bb2cec89b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -63,6 +63,7 @@ license: |
 | celeborn.worker.clean.threads | 64 | false | Thread number of worker to 
clean up expired shuffle keys. | 0.3.2 |  | 
 | celeborn.worker.closeIdleConnections | false | false | Whether worker will 
close idle connections. | 0.2.0 |  | 
 | celeborn.worker.commitFiles.check.interval | 100 | false | Time length for a 
window about checking whether commit shuffle data files finished. | 0.6.0 |  | 
+| celeborn.worker.commitFiles.fsync | false | false | Whether to fsync 
(fdatasync) shuffle data when committing. When enabled, each partition file is 
fsynced to disk before the commit completes ensuring committed data survives OS 
crashes, hard reboots etc. Enabling ensures durability but can add some latency 
to commit times. | 0.7.0 |  | 
 | celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker 
to commit shuffle data files asynchronously. It's recommended to set at least 
`128` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | 
celeborn.worker.commit.threads | 
 | celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn 
worker to commit files of a shuffle. It's recommended to set at least `240s` 
when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | 
celeborn.worker.shuffle.commit.timeout | 
 | celeborn.worker.congestionControl.check.interval | 10ms | false | Interval 
of worker checks congestion if celeborn.worker.congestionControl.enabled is 
true. | 0.3.2 |  | 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 10bf94455..a04f1d676 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -418,6 +418,7 @@ class LocalTierWriter(
     FileChannelUtils.createWritableFileChannel(diskFileInfo.getFilePath)
 
   val gatherApiEnabled: Boolean = conf.workerFlusherLocalGatherAPIEnabled
+  val commitFilesFsync: Boolean = conf.workerCommitFilesFsync
 
   override def needEvict(): Boolean = {
     false
@@ -458,7 +459,15 @@ class LocalTierWriter(
   }
 
   override def closeStreams(): Unit = {
-    channel.close()
+    if (channel != null) {
+      try {
+        if (commitFilesFsync) {
+          channel.force(false)
+        }
+      } finally {
+        channel.close()
+      }
+    }
   }
 
   override def notifyFileCommitted(): Unit =
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
index 43f999b29..ee6903ddf 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -175,8 +175,9 @@ class TierWriterSuite extends AnyFunSuite with 
BeforeAndAfterEach {
 
   }
 
-  private def prepareLocalTierWriter(rangeFilter: Boolean): LocalTierWriter = {
-    val celebornConf = new CelebornConf()
+  private def prepareLocalTierWriter(
+      rangeFilter: Boolean,
+      celebornConf: CelebornConf = new CelebornConf()): LocalTierWriter = {
     celebornConf.set("celeborn.worker.memoryFileStorage.maxFileSize", "80k")
     celebornConf.set("celeborn.client.shuffle.rangeReadFilter.enabled", 
rangeFilter.toString)
     val reduceFileMeta = new ReduceFileMeta(celebornConf.shuffleChunkSize)
@@ -314,4 +315,22 @@ class TierWriterSuite extends AnyFunSuite with 
BeforeAndAfterEach {
       
localTierWriter.fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getLastChunkOffset
 == 10240)
 
   }
+
+  test("test local tier writer with fsync enabled") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.commitFiles.fsync", "true")
+    val localTierWriter = prepareLocalTierWriter(false, conf)
+
+    assert(localTierWriter.commitFilesFsync === true)
+    for (i <- 1 to 10) {
+      localTierWriter.numPendingWrites.incrementAndGet()
+      localTierWriter.write(WriterUtils.generateSparkFormatData(
+        UnpooledByteBufAllocator.DEFAULT,
+        0))
+    }
+
+    val fileLen = localTierWriter.close()
+    assert(fileLen == 10240)
+    assert(localTierWriter.closed === true)
+  }
 }

Reply via email to