This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 13530f859f44be5b3c998b015419177d46c5d335 Author: SteNicholas <[email protected]> AuthorDate: Tue Aug 19 14:57:16 2025 +0800 [CELEBORN-2119] DfsTierWriter should close s3MultipartUploadHandler and ossMultipartUploadHandler for close resource ### What changes were proposed in this pull request? `DfsTierWriter` should close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` for close resource to avoid resource leak for destroy file writer. ### Why are the changes needed? `DfsTierWriter` does not close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` in `closeResource`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI. Closes #3433 from SteNicholas/CELEBORN-2119. Authored-by: SteNicholas <[email protected]> Signed-off-by: SteNicholas <[email protected]> (cherry picked from commit adfc563828c1ab1459ad9d93d6f6743e6e0ebba5) Signed-off-by: SteNicholas <[email protected]> --- .../celeborn/service/deploy/worker/storage/TierWriter.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala index 29865ba39..4092336f3 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala @@ -666,7 +666,14 @@ class DfsTierWriter( override def notifyFileCommitted(): Unit = storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo) - override def closeResource(): Unit = {} + override def closeResource(): Unit = { + if (s3MultipartUploadHandler != null) { + s3MultipartUploadHandler.close() + } + if (ossMultipartUploadHandler != null) { + ossMultipartUploadHandler.close() + } + } override def cleanLocalOrDfsFiles(): Unit = { hdfsFileInfo.deleteAllFiles(hadoopFs)
