Copilot commented on code in PR #3635:
URL: https://github.com/apache/celeborn/pull/3635#discussion_r2993556535
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala:
##########
@@ -458,6 +459,9 @@ class LocalTierWriter(
}
override def closeStreams(): Unit = {
+ if (commitFilesSync) {
+ channel.force(false)
+ }
channel.close()
Review Comment:
`channel.force(false)` can throw an IOException; if that happens,
`channel.close()` will be skipped and the file descriptor may leak. Wrap the
force/close sequence so the channel is always closed (e.g., `try { if
(commitFilesSync) force } finally { close }`) and decide whether an fsync
failure should fail the commit or only be logged.
```suggestion
if (channel != null) {
try {
if (commitFilesSync) {
channel.force(false)
}
} finally {
try {
channel.close()
} catch {
case e: IOException =>
logWarning(
s"Close channel failed for file ${diskFileInfo.getFilePath}
caused by ${e.getMessage}.")
}
}
}
```
##########
worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala:
##########
@@ -314,4 +315,20 @@ class TierWriterSuite extends AnyFunSuite with
BeforeAndAfterEach {
localTierWriter.fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getLastChunkOffset
== 10240)
}
+
+ test("test local tier writer with fsync enabled") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.commitFiles.fsync", "true")
+ val localTierWriter = prepareLocalTierWriter(false, conf)
Review Comment:
This test sets `celeborn.worker.commitFiles.fsync`, but the new
`CelebornConf` entry is currently keyed as `celeborn.commitFiles.fsync`, so the
test likely does not actually enable fsync and won’t catch regressions. Use the
canonical config key and consider asserting that
`localTierWriter.commitFilesSync` is true so the test verifies the flag is
picked up.
```suggestion
conf.set("celeborn.commitFiles.fsync", "true")
val localTierWriter = prepareLocalTierWriter(false, conf)
assert(localTierWriter.commitFilesSync === true)
```
##########
docs/configuration/worker.md:
##########
@@ -20,6 +20,7 @@ license: |
| Key | Default | isDynamic | Description | Since | Deprecated |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.cluster.name | default | false | Celeborn cluster name. | 0.5.0 |
|
+| celeborn.commitFiles.fsync | false | false | Whether to fsync (fdatasync)
shuffle data when committing. When enabled, each partition file is fsynced to
disk before the commit completes ensuring committed data survives OS crashes,
hard reboots etc. | 0.7.0 | |
Review Comment:
The documented key `celeborn.commitFiles.fsync` doesn’t match the worker
commit-files naming pattern used elsewhere in this same table (e.g.,
`celeborn.worker.commitFiles.*`) and also differs from the key used in the new
TierWriterSuite test (`celeborn.worker.commitFiles.fsync`). This inconsistency
will cause operator confusion and can lead to the setting not being applied.
Please update the key here to match the actual `CelebornConf` entry once you
settle on the canonical name.
```suggestion
| celeborn.worker.commitFiles.fsync | false | false | Whether to fsync
(fdatasync) shuffle data when committing. When enabled, each partition file is
fsynced to disk before the commit completes ensuring committed data survives OS
crashes, hard reboots etc. | 0.7.0 | |
```
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -3770,6 +3771,15 @@ object CelebornConf extends Logging {
.doc("Time length for a window about checking whether commit shuffle
data files finished.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100")
+ val WORKER_COMMIT_FILES_SYNC: ConfigEntry[Boolean] =
+ buildConf("celeborn.commitFiles.fsync")
Review Comment:
The new config entry is declared as a worker setting
(`WORKER_COMMIT_FILES_SYNC`, `workerCommitFilesFsync`) and sits next to other
`celeborn.worker.commitFiles.*` keys, but the actual key string is
`celeborn.commitFiles.fsync`. This is inconsistent with the existing naming and
with the new test (which sets `celeborn.worker.commitFiles.fsync`), so
users/tests may set the wrong key and the option won’t take effect. Please
align on a single key name (likely `celeborn.worker.commitFiles.fsync`) and, if
needed, add an alternative key for backward/typo compatibility, then update
docs/tests accordingly.
```suggestion
buildConf("celeborn.worker.commitFiles.fsync")
.withAlternative("celeborn.commitFiles.fsync")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]