This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 8f8b1b648 [CELEBORN-2086] S3FlushTask and OssFlushTask should close 
ByteArrayInputStream to avoid resource leak
8f8b1b648 is described below

commit 8f8b1b648571729ceec17c34b95a02ce8f0822b7
Author: SteNicholas <programg...@163.com>
AuthorDate: Tue Jul 29 17:19:18 2025 +0800

    [CELEBORN-2086] S3FlushTask and OssFlushTask should close 
ByteArrayInputStream to avoid resource leak
    
    ### What changes were proposed in this pull request?
    
    `S3FlushTask` and `OssFlushTask` should close `ByteArrayInputStream` to 
avoid resource leak.
    
    ### Why are the changes needed?
    
    `S3FlushTask` and `OssFlushTask` don't close `ByteArrayInputStream` at 
present, which may cause resource leak.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3395 from SteNicholas/CELEBORN-2086.
    
    Authored-by: SteNicholas <programg...@163.com>
    Signed-off-by: Shuang <lvshuang....@alibaba-inc.com>
    (cherry picked from commit 392f6186df564e522788b7e3a37a79733122986c)
    Signed-off-by: Shuang <lvshuang....@alibaba-inc.com>
---
 .../service/deploy/worker/storage/FlushTask.scala  | 52 ++++++++++++++++------
 1 file changed, 38 insertions(+), 14 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index e9c7f7490..b87b1d440 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -17,12 +17,13 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
-import java.io.ByteArrayInputStream
+import java.io.{ByteArrayInputStream, Closeable, IOException}
 import java.nio.channels.FileChannel
 
 import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
 import org.apache.hadoop.fs.Path
 
+import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.protocol.StorageInfo.Type
 import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler
@@ -65,20 +66,39 @@ private[worker] class LocalFlushTask(
   }
 }
 
+abstract private[worker] class DfsFlushTask(
+    buffer: CompositeByteBuf,
+    notifier: FlushNotifier,
+    keepBuffer: Boolean,
+    source: AbstractSource) extends FlushTask(buffer, notifier, keepBuffer, 
source) with Logging {
+  def flush(stream: Closeable)(block: => Unit): Unit = {
+    try {
+      block
+    } finally {
+      try {
+        stream.close()
+      } catch {
+        case e: IOException => logWarning("Close flush dfs stream failed.", e)
+      }
+    }
+  }
+}
+
 private[worker] class HdfsFlushTask(
     buffer: CompositeByteBuf,
     val path: Path,
     notifier: FlushNotifier,
     keepBuffer: Boolean,
-    source: AbstractSource) extends FlushTask(buffer, notifier, keepBuffer, 
source) {
+    source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, 
source) {
   override def flush(): Unit = {
     val readableBytes = buffer.readableBytes()
     val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
     val hdfsStream = hadoopFs.append(path, 256 * 1024)
-    hdfsStream.write(ByteBufUtil.getBytes(buffer))
-    hdfsStream.close()
-    source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
-    source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+    flush(hdfsStream) {
+      hdfsStream.write(ByteBufUtil.getBytes(buffer))
+      source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
+      source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+    }
   }
 }
 
@@ -90,15 +110,17 @@ private[worker] class S3FlushTask(
     s3MultipartUploader: MultipartUploadHandler,
     partNumber: Int,
     finalFlush: Boolean = false)
-  extends FlushTask(buffer, notifier, keepBuffer, source) {
+  extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
 
   override def flush(): Unit = {
     val readableBytes = buffer.readableBytes()
     val bytes = ByteBufUtil.getBytes(buffer)
     val inputStream = new ByteArrayInputStream(bytes)
-    s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
-    source.incCounter(WorkerSource.S3_FLUSH_COUNT)
-    source.incCounter(WorkerSource.S3_FLUSH_SIZE, readableBytes)
+    flush(inputStream) {
+      s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
+      source.incCounter(WorkerSource.S3_FLUSH_COUNT)
+      source.incCounter(WorkerSource.S3_FLUSH_SIZE, readableBytes)
+    }
   }
 }
 
@@ -110,14 +132,16 @@ private[worker] class OssFlushTask(
     ossMultipartUploader: MultipartUploadHandler,
     partNumber: Int,
     finalFlush: Boolean = false)
-  extends FlushTask(buffer, notifier, keepBuffer, source) {
+  extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
 
   override def flush(): Unit = {
     val readableBytes = buffer.readableBytes()
     val bytes = ByteBufUtil.getBytes(buffer)
     val inputStream = new ByteArrayInputStream(bytes)
-    ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
-    source.incCounter(WorkerSource.OSS_FLUSH_COUNT)
-    source.incCounter(WorkerSource.OSS_FLUSH_SIZE, readableBytes)
+    flush(inputStream) {
+      ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
+      source.incCounter(WorkerSource.OSS_FLUSH_COUNT)
+      source.incCounter(WorkerSource.OSS_FLUSH_SIZE, readableBytes)
+    }
   }
 }

Reply via email to