Copilot commented on code in PR #3635:
URL: https://github.com/apache/celeborn/pull/3635#discussion_r2993556535


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:
##########
@@ -458,6 +459,9 @@ class LocalTierWriter(
   }
 
   override def closeStreams(): Unit = {
+    if (commitFilesSync) {
+      channel.force(false)
+    }
     channel.close()

Review Comment:
   `channel.force(false)` can throw an IOException; if that happens, 
`channel.close()` will be skipped and the file descriptor may leak. Wrap the 
force/close sequence so the channel is always closed (e.g., `try { if 
(commitFilesSync) force } finally { close }`) and decide whether an fsync 
failure should fail the commit or only be logged.
   ```suggestion
       if (channel != null) {
         try {
           if (commitFilesSync) {
             channel.force(false)
           }
         } finally {
           try {
             channel.close()
           } catch {
             case e: IOException =>
               logWarning(
                 s"Close channel failed for file ${diskFileInfo.getFilePath} 
caused by ${e.getMessage}.")
           }
         }
       }
   ```



##########
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala:
##########
@@ -314,4 +315,20 @@ class TierWriterSuite extends AnyFunSuite with 
BeforeAndAfterEach {
       
localTierWriter.fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getLastChunkOffset
 == 10240)
 
   }
+
+  test("test local tier writer with fsync enabled") {
+    val conf = new CelebornConf()
+    conf.set("celeborn.worker.commitFiles.fsync", "true")
+    val localTierWriter = prepareLocalTierWriter(false, conf)

Review Comment:
   This test sets `celeborn.worker.commitFiles.fsync`, but the new 
`CelebornConf` entry is currently keyed as `celeborn.commitFiles.fsync`, so the 
test likely does not actually enable fsync and won’t catch regressions. Use the 
canonical config key and consider asserting that 
`localTierWriter.commitFilesSync` is true so the test verifies the flag is 
picked up.
   ```suggestion
       conf.set("celeborn.commitFiles.fsync", "true")
       val localTierWriter = prepareLocalTierWriter(false, conf)
       assert(localTierWriter.commitFilesSync === true)
   ```



##########
docs/configuration/worker.md:
##########
@@ -20,6 +20,7 @@ license: |
 | Key | Default | isDynamic | Description | Since | Deprecated |
 | --- | ------- | --------- | ----------- | ----- | ---------- |
 | celeborn.cluster.name | default | false | Celeborn cluster name. | 0.5.0 |  
| 
+| celeborn.commitFiles.fsync | false | false | Whether to fsync (fdatasync) 
shuffle data when committing. When enabled, each partition file is fsynced to 
disk before the commit completes ensuring committed data survives OS crashes, 
hard reboots etc. | 0.7.0 |  | 

Review Comment:
   The documented key `celeborn.commitFiles.fsync` doesn’t match the worker 
commit-files naming pattern used elsewhere in this same table (e.g., 
`celeborn.worker.commitFiles.*`) and also differs from the key used in the new 
TierWriterSuite test (`celeborn.worker.commitFiles.fsync`). This inconsistency 
will cause operator confusion and can lead to the setting not being applied. 
Please update the key here to match the actual `CelebornConf` entry once you 
settle on the canonical name.
   ```suggestion
   | celeborn.worker.commitFiles.fsync | false | false | Whether to fsync 
(fdatasync) shuffle data when committing. When enabled, each partition file is 
fsynced to disk before the commit completes ensuring committed data survives OS 
crashes, hard reboots etc. | 0.7.0 |  | 
   ```



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -3770,6 +3771,15 @@ object CelebornConf extends Logging {
       .doc("Time length for a window about checking whether commit shuffle 
data files finished.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("100")
+  val WORKER_COMMIT_FILES_SYNC: ConfigEntry[Boolean] =
+    buildConf("celeborn.commitFiles.fsync")

Review Comment:
   The new config entry is declared as a worker setting 
(`WORKER_COMMIT_FILES_SYNC`, `workerCommitFilesFsync`) and sits next to other 
`celeborn.worker.commitFiles.*` keys, but the actual key string is 
`celeborn.commitFiles.fsync`. This is inconsistent with the existing naming and 
with the new test (which sets `celeborn.worker.commitFiles.fsync`), so 
users/tests may set the wrong key and the option won’t take effect. Please 
align on a single key name (likely `celeborn.worker.commitFiles.fsync`) and, if 
needed, add an alternative key for backward/typo compatibility, then update 
docs/tests accordingly.
   ```suggestion
       buildConf("celeborn.worker.commitFiles.fsync")
         .withAlternative("celeborn.commitFiles.fsync")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to