This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new adfc56382 [CELEBORN-2119] DfsTierWriter should close
s3MultipartUploadHandler and ossMultipartUploadHandler for close resource
adfc56382 is described below
commit adfc563828c1ab1459ad9d93d6f6743e6e0ebba5
Author: SteNicholas <[email protected]>
AuthorDate: Tue Aug 19 14:57:16 2025 +0800
[CELEBORN-2119] DfsTierWriter should close s3MultipartUploadHandler and
ossMultipartUploadHandler for close resource
### What changes were proposed in this pull request?
`DfsTierWriter` should close `s3MultipartUploadHandler` and
`ossMultipartUploadHandler` for close resource to avoid resource leak for
destroy file writer.
### Why are the changes needed?
`DfsTierWriter` does not close `s3MultipartUploadHandler` and
`ossMultipartUploadHandler` in `closeResource`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3433 from SteNicholas/CELEBORN-2119.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/service/deploy/worker/storage/TierWriter.scala | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index cd6bb19ae..23ee7f9d1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -668,7 +668,14 @@ class DfsTierWriter(
override def notifyFileCommitted(): Unit =
storageManager.notifyFileInfoCommitted(shuffleKey, filename, dfsFileInfo)
- override def closeResource(): Unit = {}
+ override def closeResource(): Unit = {
+ if (s3MultipartUploadHandler != null) {
+ s3MultipartUploadHandler.close()
+ }
+ if (ossMultipartUploadHandler != null) {
+ ossMultipartUploadHandler.close()
+ }
+ }
override def cleanLocalOrDfsFiles(): Unit = {
dfsFileInfo.deleteAllFiles(hadoopFs)