This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 8f8b1b648 [CELEBORN-2086] S3FlushTask and OssFlushTask should close
ByteArrayInputStream to avoid resource leak
8f8b1b648 is described below
commit 8f8b1b648571729ceec17c34b95a02ce8f0822b7
Author: SteNicholas <[email protected]>
AuthorDate: Tue Jul 29 17:19:18 2025 +0800
[CELEBORN-2086] S3FlushTask and OssFlushTask should close
ByteArrayInputStream to avoid resource leak
### What changes were proposed in this pull request?
`S3FlushTask` and `OssFlushTask` should close `ByteArrayInputStream` to
avoid resource leak.
### Why are the changes needed?
`S3FlushTask` and `OssFlushTask` don't close `ByteArrayInputStream` at
present, which may cause resource leak.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3395 from SteNicholas/CELEBORN-2086.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 392f6186df564e522788b7e3a37a79733122986c)
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/storage/FlushTask.scala | 52 ++++++++++++++++------
1 file changed, 38 insertions(+), 14 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index e9c7f7490..b87b1d440 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -17,12 +17,13 @@
package org.apache.celeborn.service.deploy.worker.storage
-import java.io.ByteArrayInputStream
+import java.io.{ByteArrayInputStream, Closeable, IOException}
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
import org.apache.hadoop.fs.Path
+import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.protocol.StorageInfo.Type
import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler
@@ -65,20 +66,39 @@ private[worker] class LocalFlushTask(
}
}
+abstract private[worker] class DfsFlushTask(
+ buffer: CompositeByteBuf,
+ notifier: FlushNotifier,
+ keepBuffer: Boolean,
+ source: AbstractSource) extends FlushTask(buffer, notifier, keepBuffer,
source) with Logging {
+ def flush(stream: Closeable)(block: => Unit): Unit = {
+ try {
+ block
+ } finally {
+ try {
+ stream.close()
+ } catch {
+ case e: IOException => logWarning("Close flush dfs stream failed.", e)
+ }
+ }
+ }
+}
+
private[worker] class HdfsFlushTask(
buffer: CompositeByteBuf,
val path: Path,
notifier: FlushNotifier,
keepBuffer: Boolean,
- source: AbstractSource) extends FlushTask(buffer, notifier, keepBuffer,
source) {
+ source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer,
source) {
override def flush(): Unit = {
val readableBytes = buffer.readableBytes()
val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
val hdfsStream = hadoopFs.append(path, 256 * 1024)
- hdfsStream.write(ByteBufUtil.getBytes(buffer))
- hdfsStream.close()
- source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
- source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+ flush(hdfsStream) {
+ hdfsStream.write(ByteBufUtil.getBytes(buffer))
+ source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
+ source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+ }
}
}
@@ -90,15 +110,17 @@ private[worker] class S3FlushTask(
s3MultipartUploader: MultipartUploadHandler,
partNumber: Int,
finalFlush: Boolean = false)
- extends FlushTask(buffer, notifier, keepBuffer, source) {
+ extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
override def flush(): Unit = {
val readableBytes = buffer.readableBytes()
val bytes = ByteBufUtil.getBytes(buffer)
val inputStream = new ByteArrayInputStream(bytes)
- s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
- source.incCounter(WorkerSource.S3_FLUSH_COUNT)
- source.incCounter(WorkerSource.S3_FLUSH_SIZE, readableBytes)
+ flush(inputStream) {
+ s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
+ source.incCounter(WorkerSource.S3_FLUSH_COUNT)
+ source.incCounter(WorkerSource.S3_FLUSH_SIZE, readableBytes)
+ }
}
}
@@ -110,14 +132,16 @@ private[worker] class OssFlushTask(
ossMultipartUploader: MultipartUploadHandler,
partNumber: Int,
finalFlush: Boolean = false)
- extends FlushTask(buffer, notifier, keepBuffer, source) {
+ extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
override def flush(): Unit = {
val readableBytes = buffer.readableBytes()
val bytes = ByteBufUtil.getBytes(buffer)
val inputStream = new ByteArrayInputStream(bytes)
- ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
- source.incCounter(WorkerSource.OSS_FLUSH_COUNT)
- source.incCounter(WorkerSource.OSS_FLUSH_SIZE, readableBytes)
+ flush(inputStream) {
+ ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
+ source.incCounter(WorkerSource.OSS_FLUSH_COUNT)
+ source.incCounter(WorkerSource.OSS_FLUSH_SIZE, readableBytes)
+ }
}
}