This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ead784fa [CELEBORN-2085] Use a fixed buffer for flush copying to 
reduce GC
1ead784fa is described below

commit 1ead784fa1374bcfc1a8f8802e38fca48b1ee138
Author: TheodoreLx <[email protected]>
AuthorDate: Fri Aug 8 13:57:21 2025 +0800

    [CELEBORN-2085] Use a fixed buffer for flush copying to reduce GC
    
    ### What changes were proposed in this pull request?
    
    Apply for a byte array in advance and use it as a transfer when copying is 
needed during flush
    
    ### Why are the changes needed?
    
    For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the 
CompositeByteBuf in the parameter to a byte array when flushing, and then use 
the respective clients to write the byte array to the storage.
    When the flush throughput rate is very high, this copying will cause very 
serious GC problems and affect the performance of the worker
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    cluster test
    
    Closes #3394 from TheodoreLx/copy-on-flush.
    
    Authored-by: TheodoreLx <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 14 ++++++++
 docs/configuration/worker.md                       |  1 +
 .../service/deploy/worker/storage/FlushTask.scala  | 28 ++++++++++-----
 .../service/deploy/worker/storage/Flusher.scala    | 41 ++++++++++++++++------
 .../deploy/worker/storage/StorageManager.scala     | 15 +++++---
 .../storage/PartitionDataWriterSuiteUtils.java     |  3 +-
 .../local/DiskMapPartitionDataWriterSuiteJ.java    |  3 +-
 .../local/DiskReducePartitionDataWriterSuiteJ.java |  6 ++--
 .../deploy/worker/storage/TierWriterSuite.scala    |  3 +-
 9 files changed, 87 insertions(+), 27 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9d7184c96..986d77e0f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -687,6 +687,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def masterResourceConsumptionInterval: Long = 
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
   def masterResourceConsumptionMetricsEnabled: Boolean =
     get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED)
+  def workerFlushReuseCopyBufferEnabled: Boolean =
+    get(WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED)
   def clusterName: String = get(CLUSTER_NAME)
 
   // //////////////////////////////////////////////////////
@@ -6613,4 +6615,16 @@ object CelebornConf extends Logging {
       .version("0.6.0")
       .booleanConf
       .createWithDefaultString("false")
+
+  val WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED: ConfigEntry[Boolean] =
+    buildConf("worker.flush.reuseCopyBuffer.enabled")
+      .categories("worker")
+      .doc("Whether to enable reuse copy buffer for flush. Note that this copy 
buffer must not" +
+        " be referenced again after flushing. This means that, for example, 
the Hdfs(Oss or S3) client" +
+        " will not asynchronously access this buffer after the flush method 
returns, otherwise data" +
+        " modification problems will occur.")
+      .version("0.6.1")
+      .booleanConf
+      .createWithDefaultString("true")
+
 }
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8e9af6608..72343249e 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -193,4 +193,5 @@ license: |
 | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | 
Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir | 
 | celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file 
writer to close | 0.2.0 |  | 
 | celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a 
file writer to create if its creation was failed. | 0.2.0 |  | 
+| worker.flush.reuseCopyBuffer.enabled | true | false | Whether to enable 
reuse copy buffer for flush. Note that this copy buffer must not be referenced 
again after flushing. This means that, for example, the Hdfs(Oss or S3) client 
will not asynchronously access this buffer after the flush method returns, 
otherwise data modification problems will occur. | 0.6.1 |  | 
 <!--end-include-->
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index b87b1d440..5053456d4 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -34,7 +34,19 @@ abstract private[worker] class FlushTask(
     val notifier: FlushNotifier,
     val keepBuffer: Boolean,
     val source: AbstractSource) {
-  def flush(): Unit
+  def flush(copyBytes: Array[Byte]): Unit
+
+  def convertBufferToBytes(
+      buffer: CompositeByteBuf,
+      copyBytes: Array[Byte],
+      length: Int): Array[Byte] = {
+    if (copyBytes != null && copyBytes.length >= length) {
+      buffer.readBytes(copyBytes, 0, length)
+      copyBytes
+    } else {
+      ByteBufUtil.getBytes(buffer)
+    }
+  }
 }
 
 private[worker] class LocalFlushTask(
@@ -44,7 +56,7 @@ private[worker] class LocalFlushTask(
     keepBuffer: Boolean,
     source: AbstractSource,
     gatherApiEnabled: Boolean) extends FlushTask(buffer, notifier, keepBuffer, 
source) {
-  override def flush(): Unit = {
+  override def flush(copyBytes: Array[Byte]): Unit = {
     val readableBytes = buffer.readableBytes()
     val buffers = buffer.nioBuffers()
     if (gatherApiEnabled) {
@@ -90,12 +102,12 @@ private[worker] class HdfsFlushTask(
     notifier: FlushNotifier,
     keepBuffer: Boolean,
     source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, 
source) {
-  override def flush(): Unit = {
+  override def flush(copyBytes: Array[Byte]): Unit = {
     val readableBytes = buffer.readableBytes()
     val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
     val hdfsStream = hadoopFs.append(path, 256 * 1024)
     flush(hdfsStream) {
-      hdfsStream.write(ByteBufUtil.getBytes(buffer))
+      hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
       source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
       source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
     }
@@ -112,9 +124,9 @@ private[worker] class S3FlushTask(
     finalFlush: Boolean = false)
   extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
 
-  override def flush(): Unit = {
+  override def flush(copyBytes: Array[Byte]): Unit = {
     val readableBytes = buffer.readableBytes()
-    val bytes = ByteBufUtil.getBytes(buffer)
+    val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes)
     val inputStream = new ByteArrayInputStream(bytes)
     flush(inputStream) {
       s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
@@ -134,9 +146,9 @@ private[worker] class OssFlushTask(
     finalFlush: Boolean = false)
   extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
 
-  override def flush(): Unit = {
+  override def flush(copyBytes: Array[Byte]): Unit = {
     val readableBytes = buffer.readableBytes()
-    val bytes = ByteBufUtil.getBytes(buffer)
+    val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes)
     val inputStream = new ByteArrayInputStream(bytes)
     flush(inputStream) {
       ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 1aa7bde6f..2a4134e79 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -41,7 +41,9 @@ abstract private[worker] class Flusher(
     val allocator: ByteBufAllocator,
     val maxComponents: Int,
     flushTimeMetric: TimeWindow,
-    mountPoint: String) extends Logging {
+    mountPoint: String,
+    val reuseCopyBuffer: Boolean,
+    val maxTaskSize: Long) extends Logging {
   protected lazy val flusherId: Int = System.identityHashCode(this)
   protected val workingQueues = new 
Array[LinkedBlockingQueue[FlushTask]](threadCount)
   protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
@@ -62,6 +64,10 @@ abstract private[worker] class Flusher(
       workers(index) = 
ThreadUtils.newDaemonSingleThreadExecutor(s"$this-$index")
       workers(index).submit(new Runnable {
         override def run(): Unit = {
+          var copyBytes: Array[Byte] = null
+          if (reuseCopyBuffer) {
+            copyBytes = new Array[Byte](maxTaskSize.toInt)
+          }
           while (!stopFlag.get()) {
             val task = workingQueues(index).take()
             val key = s"Flusher-$this-${Random.nextInt()}"
@@ -70,7 +76,7 @@ abstract private[worker] class Flusher(
                 try {
                   val flushBeginTime = System.nanoTime()
                   lastBeginFlushTime.set(index, flushBeginTime)
-                  task.flush()
+                  task.flush(copyBytes)
                   if (flushTimeMetric != null) {
                     val delta = System.nanoTime() - flushBeginTime
                     flushTimeMetric.update(delta)
@@ -141,13 +147,16 @@ private[worker] class LocalFlusher(
     maxComponents: Int,
     val mountPoint: String,
     val diskType: StorageInfo.Type,
-    timeWindow: TimeWindow) extends Flusher(
+    timeWindow: TimeWindow,
+    maxTaskSize: Long) extends Flusher(
     workerSource,
     threadCount,
     allocator,
     maxComponents,
     timeWindow,
-    mountPoint)
+    mountPoint,
+    false,
+    maxTaskSize)
   with DeviceObserver with Logging {
 
   deviceMonitor.registerFlusher(this)
@@ -177,13 +186,17 @@ final private[worker] class HdfsFlusher(
     workerSource: AbstractSource,
     hdfsFlusherThreads: Int,
     allocator: ByteBufAllocator,
-    maxComponents: Int) extends Flusher(
+    maxComponents: Int,
+    reuseCopyBuffer: Boolean,
+    maxTaskSize: Long) extends Flusher(
     workerSource,
     hdfsFlusherThreads,
     allocator,
     maxComponents,
     null,
-    "HDFS") with Logging {
+    "HDFS",
+    reuseCopyBuffer,
+    maxTaskSize) with Logging {
 
   override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
     logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
@@ -196,13 +209,17 @@ final private[worker] class S3Flusher(
     workerSource: AbstractSource,
     s3FlusherThreads: Int,
     allocator: ByteBufAllocator,
-    maxComponents: Int) extends Flusher(
+    maxComponents: Int,
+    reuseCopyBuffer: Boolean,
+    maxTaskSize: Long) extends Flusher(
     workerSource,
     s3FlusherThreads,
     allocator,
     maxComponents,
     null,
-    "S3") with Logging {
+    "S3",
+    reuseCopyBuffer,
+    maxTaskSize) with Logging {
 
   override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
     logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
@@ -215,13 +232,17 @@ final private[worker] class OssFlusher(
     workerSource: AbstractSource,
     ossFlusherThreads: Int,
     allocator: ByteBufAllocator,
-    maxComponents: Int) extends Flusher(
+    maxComponents: Int,
+    reuseCopyBuffer: Boolean,
+    maxTaskSize: Long) extends Flusher(
     workerSource,
     ossFlusherThreads,
     allocator,
     maxComponents,
     null,
-    "OSS") with Logging {
+    "OSS",
+    reuseCopyBuffer,
+    maxTaskSize) with Logging {
 
   override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
     logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index bf3791343..9353ee23a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -149,7 +149,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           conf.workerPushMaxComponents,
           diskInfo.mountPoint,
           diskInfo.storageType,
-          diskInfo.flushTimeMetrics)
+          diskInfo.flushTimeMetrics,
+          conf.workerFlusherBufferSize)
         flushers.put(diskInfo.mountPoint, flusher)
         totalThread = totalThread + diskInfo.threadCount
       }
@@ -176,7 +177,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           workerSource,
           conf.workerHdfsFlusherThreads,
           storageBufferAllocator,
-          conf.workerPushMaxComponents)),
+          conf.workerPushMaxComponents,
+          conf.workerFlushReuseCopyBufferEnabled,
+          conf.workerHdfsFlusherBufferSize)),
         conf.workerHdfsFlusherThreads)
     } else {
       (None, 0)
@@ -197,7 +200,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           workerSource,
           conf.workerS3FlusherThreads,
           storageBufferAllocator,
-          conf.workerPushMaxComponents)),
+          conf.workerPushMaxComponents,
+          conf.workerFlushReuseCopyBufferEnabled,
+          conf.workerS3FlusherBufferSize)),
         conf.workerS3FlusherThreads)
     } else {
       (None, 0)
@@ -218,7 +223,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           workerSource,
           conf.workerOssFlusherThreads,
           storageBufferAllocator,
-          conf.workerPushMaxComponents)),
+          conf.workerPushMaxComponents,
+          conf.workerFlushReuseCopyBufferEnabled,
+          conf.workerOssFlusherBufferSize)),
         conf.workerOssFlusherThreads)
     } else {
       (None, 0)
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index 918e2e27b..9d5c6c26a 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -195,7 +195,8 @@ public class PartitionDataWriterSuiteUtils {
             256,
             "disk1",
             StorageInfo.Type.HDD,
-            null);
+            null,
+            celebornConf.workerFlusherBufferSize());
     Mockito.doAnswer(
             invocation -> {
               if (callCounter.getAndIncrement() == 0) {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
index 17392f464..87ed418a3 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
@@ -91,7 +91,8 @@ public class DiskMapPartitionDataWriterSuiteJ {
             256,
             "disk1",
             StorageInfo.Type.HDD,
-            null);
+            null,
+            CONF.workerFlusherBufferSize());
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), 
"0.8");
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
index 6fbbda71a..1faa04a55 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
@@ -119,7 +119,8 @@ public class DiskReducePartitionDataWriterSuiteJ {
             256,
             "disk1",
             StorageInfo.Type.HDD,
-            null);
+            null,
+            CONF.workerFlusherBufferSize());
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), 
"0.8");
@@ -442,7 +443,8 @@ public class DiskReducePartitionDataWriterSuiteJ {
             256,
             "disk2",
             StorageInfo.Type.HDD,
-            null);
+            null,
+            CONF.workerFlusherBufferSize());
   }
 
   @Test
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
index 068c351ab..0cd13f8a9 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -213,7 +213,8 @@ class TierWriterSuite extends AnyFunSuite with 
BeforeAndAfterEach {
       256,
       "disk1",
       StorageInfo.Type.HDD,
-      null)
+      null,
+      celebornConf.workerFlusherBufferSize)
     val storageManager: StorageManager = Mockito.mock(classOf[StorageManager])
 
     MemoryManager.initialize(celebornConf, storageManager, null)

Reply via email to