This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 047edf035a6dbb76d168ac6ae42d6344a9da0027 Author: TheodoreLx <[email protected]> AuthorDate: Fri Aug 8 13:57:21 2025 +0800 [CELEBORN-2085] Use a fixed buffer for flush copying to reduce GC ### What changes were proposed in this pull request? Apply for a byte array in advance and use it as a transfer when copying is needed during flush ### Why are the changes needed? For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the CompositeByteBuf in the parameter to a byte array when flushing, and then use the respective clients to write the byte array to the storage. When the flush throughput rate is very high, this copying will cause very serious GC problems and affect the performance of the worker ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? cluster test Closes #3394 from TheodoreLx/copy-on-flush. Authored-by: TheodoreLx <[email protected]> Signed-off-by: Shuang <[email protected]> (cherry picked from commit 1ead784fa1374bcfc1a8f8802e38fca48b1ee138) Signed-off-by: Shuang <[email protected]> --- .../org/apache/celeborn/common/CelebornConf.scala | 14 ++++++++ docs/configuration/worker.md | 1 + .../service/deploy/worker/storage/FlushTask.scala | 28 ++++++++++----- .../service/deploy/worker/storage/Flusher.scala | 41 ++++++++++++++++------ .../deploy/worker/storage/StorageManager.scala | 15 +++++--- .../storage/PartitionDataWriterSuiteUtils.java | 3 +- .../local/DiskMapPartitionDataWriterSuiteJ.java | 3 +- .../local/DiskReducePartitionDataWriterSuiteJ.java | 6 ++-- .../deploy/worker/storage/TierWriterSuite.scala | 3 +- 9 files changed, 87 insertions(+), 27 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index e17bc1c1f..e898b3a99 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -683,6 +683,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL) def masterResourceConsumptionMetricsEnabled: Boolean = get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED) + def workerFlushReuseCopyBufferEnabled: Boolean = + get(WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED) def clusterName: String = get(CLUSTER_NAME) // ////////////////////////////////////////////////////// @@ -6555,4 +6557,16 @@ object CelebornConf extends Logging { .version("0.6.0") .booleanConf .createWithDefaultString("false") + + val WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED: ConfigEntry[Boolean] = + buildConf("worker.flush.reuseCopyBuffer.enabled") + .categories("worker") + .doc("Whether to enable reuse copy buffer for flush. Note that this copy buffer must not" + + " be referenced again after flushing. This means that, for example, the Hdfs(Oss or S3) client" + + " will not asynchronously access this buffer after the flush method returns, otherwise data" + + " modification problems will occur.") + .version("0.6.1") + .booleanConf + .createWithDefaultString("true") + } diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 8e9af6608..72343249e 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -193,4 +193,5 @@ license: | | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir | | celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | | | celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | | +| worker.flush.reuseCopyBuffer.enabled | true | false | Whether to enable reuse copy buffer for flush. Note that this copy buffer must not be referenced again after flushing. This means that, for example, the Hdfs(Oss or S3) client will not asynchronously access this buffer after the flush method returns, otherwise data modification problems will occur. | 0.6.1 | | <!--end-include--> diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala index b87b1d440..5053456d4 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala @@ -34,7 +34,19 @@ abstract private[worker] class FlushTask( val notifier: FlushNotifier, val keepBuffer: Boolean, val source: AbstractSource) { - def flush(): Unit + def flush(copyBytes: Array[Byte]): Unit + + def convertBufferToBytes( + buffer: CompositeByteBuf, + copyBytes: Array[Byte], + length: Int): Array[Byte] = { + if (copyBytes != null && copyBytes.length >= length) { + buffer.readBytes(copyBytes, 0, length) + copyBytes + } else { + ByteBufUtil.getBytes(buffer) + } + } } private[worker] class LocalFlushTask( @@ -44,7 +56,7 @@ private[worker] class LocalFlushTask( keepBuffer: Boolean, source: AbstractSource, gatherApiEnabled: Boolean) extends FlushTask(buffer, notifier, keepBuffer, source) { - override def flush(): Unit = { + override def flush(copyBytes: Array[Byte]): Unit = { val readableBytes = buffer.readableBytes() val buffers = buffer.nioBuffers() if (gatherApiEnabled) { @@ -90,12 +102,12 @@ private[worker] class HdfsFlushTask( notifier: FlushNotifier, keepBuffer: Boolean, source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, source) { - override def flush(): Unit = { + override def flush(copyBytes: Array[Byte]): Unit = { val readableBytes = buffer.readableBytes() val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS) val hdfsStream = hadoopFs.append(path, 256 * 1024) flush(hdfsStream) { - hdfsStream.write(ByteBufUtil.getBytes(buffer)) + hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes)) source.incCounter(WorkerSource.HDFS_FLUSH_COUNT) source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes) } @@ -112,9 +124,9 @@ private[worker] class S3FlushTask( finalFlush: Boolean = false) extends DfsFlushTask(buffer, notifier, keepBuffer, source) { - override def flush(): Unit = { + override def flush(copyBytes: Array[Byte]): Unit = { val readableBytes = buffer.readableBytes() - val bytes = ByteBufUtil.getBytes(buffer) + val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes) val inputStream = new ByteArrayInputStream(bytes) flush(inputStream) { s3MultipartUploader.putPart(inputStream, partNumber, finalFlush) @@ -134,9 +146,9 @@ private[worker] class OssFlushTask( finalFlush: Boolean = false) extends DfsFlushTask(buffer, notifier, keepBuffer, source) { - override def flush(): Unit = { + override def flush(copyBytes: Array[Byte]): Unit = { val readableBytes = buffer.readableBytes() - val bytes = ByteBufUtil.getBytes(buffer) + val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes) val inputStream = new ByteArrayInputStream(bytes) flush(inputStream) { ossMultipartUploader.putPart(inputStream, partNumber, finalFlush) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 1aa7bde6f..2a4134e79 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -41,7 +41,9 @@ abstract private[worker] class Flusher( val allocator: ByteBufAllocator, val maxComponents: Int, flushTimeMetric: TimeWindow, - mountPoint: String) extends Logging { + mountPoint: String, + val reuseCopyBuffer: Boolean, + val maxTaskSize: Long) extends Logging { protected lazy val flusherId: Int = System.identityHashCode(this) protected val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount) protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]() @@ -62,6 +64,10 @@ abstract private[worker] class Flusher( workers(index) = ThreadUtils.newDaemonSingleThreadExecutor(s"$this-$index") workers(index).submit(new Runnable { override def run(): Unit = { + var copyBytes: Array[Byte] = null + if (reuseCopyBuffer) { + copyBytes = new Array[Byte](maxTaskSize.toInt) + } while (!stopFlag.get()) { val task = workingQueues(index).take() val key = s"Flusher-$this-${Random.nextInt()}" @@ -70,7 +76,7 @@ abstract private[worker] class Flusher( try { val flushBeginTime = System.nanoTime() lastBeginFlushTime.set(index, flushBeginTime) - task.flush() + task.flush(copyBytes) if (flushTimeMetric != null) { val delta = System.nanoTime() - flushBeginTime flushTimeMetric.update(delta) @@ -141,13 +147,16 @@ private[worker] class LocalFlusher( maxComponents: Int, val mountPoint: String, val diskType: StorageInfo.Type, - timeWindow: TimeWindow) extends Flusher( + timeWindow: TimeWindow, + maxTaskSize: Long) extends Flusher( workerSource, threadCount, allocator, maxComponents, timeWindow, - mountPoint) + mountPoint, + false, + maxTaskSize) with DeviceObserver with Logging { deviceMonitor.registerFlusher(this) @@ -177,13 +186,17 @@ final private[worker] class HdfsFlusher( workerSource: AbstractSource, hdfsFlusherThreads: Int, allocator: ByteBufAllocator, - maxComponents: Int) extends Flusher( + maxComponents: Int, + reuseCopyBuffer: Boolean, + maxTaskSize: Long) extends Flusher( workerSource, hdfsFlusherThreads, allocator, maxComponents, null, - "HDFS") with Logging { + "HDFS", + reuseCopyBuffer, + maxTaskSize) with Logging { override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = { logError(s"$this write failed, reason $deviceErrorType ,exception: $e") @@ -196,13 +209,17 @@ final private[worker] class S3Flusher( workerSource: AbstractSource, s3FlusherThreads: Int, allocator: ByteBufAllocator, - maxComponents: Int) extends Flusher( + maxComponents: Int, + reuseCopyBuffer: Boolean, + maxTaskSize: Long) extends Flusher( workerSource, s3FlusherThreads, allocator, maxComponents, null, - "S3") with Logging { + "S3", + reuseCopyBuffer, + maxTaskSize) with Logging { override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = { logError(s"$this write failed, reason $deviceErrorType ,exception: $e") @@ -215,13 +232,17 @@ final private[worker] class OssFlusher( workerSource: AbstractSource, ossFlusherThreads: Int, allocator: ByteBufAllocator, - maxComponents: Int) extends Flusher( + maxComponents: Int, + reuseCopyBuffer: Boolean, + maxTaskSize: Long) extends Flusher( workerSource, ossFlusherThreads, allocator, maxComponents, null, - "OSS") with Logging { + "OSS", + reuseCopyBuffer, + maxTaskSize) with Logging { override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = { logError(s"$this write failed, reason $deviceErrorType ,exception: $e") diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index b64707c42..ddd9c6666 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -157,7 +157,8 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs conf.workerPushMaxComponents, diskInfo.mountPoint, diskInfo.storageType, - diskInfo.flushTimeMetrics) + diskInfo.flushTimeMetrics, + conf.workerFlusherBufferSize) flushers.put(diskInfo.mountPoint, flusher) totalThread = totalThread + diskInfo.threadCount } @@ -184,7 +185,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs workerSource, conf.workerHdfsFlusherThreads, storageBufferAllocator, - conf.workerPushMaxComponents)), + conf.workerPushMaxComponents, + conf.workerFlushReuseCopyBufferEnabled, + conf.workerHdfsFlusherBufferSize)), conf.workerHdfsFlusherThreads) } else { (None, 0) @@ -205,7 +208,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs workerSource, conf.workerS3FlusherThreads, storageBufferAllocator, - conf.workerPushMaxComponents)), + conf.workerPushMaxComponents, + conf.workerFlushReuseCopyBufferEnabled, + conf.workerS3FlusherBufferSize)), conf.workerS3FlusherThreads) } else { (None, 0) @@ -226,7 +231,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs workerSource, conf.workerOssFlusherThreads, storageBufferAllocator, - conf.workerPushMaxComponents)), + conf.workerPushMaxComponents, + conf.workerFlushReuseCopyBufferEnabled, + conf.workerOssFlusherBufferSize)), conf.workerOssFlusherThreads) } else { (None, 0) diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java index 918e2e27b..9d5c6c26a 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java @@ -195,7 +195,8 @@ public class PartitionDataWriterSuiteUtils { 256, "disk1", StorageInfo.Type.HDD, - null); + null, + celebornConf.workerFlusherBufferSize()); Mockito.doAnswer( invocation -> { if (callCounter.getAndIncrement() == 0) { diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java index 17392f464..87ed418a3 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java @@ -91,7 +91,8 @@ public class DiskMapPartitionDataWriterSuiteJ { 256, "disk1", StorageInfo.Type.HDD, - null); + null, + CONF.workerFlusherBufferSize()); CelebornConf conf = new CelebornConf(); conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), "0.8"); diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java index 6fbbda71a..1faa04a55 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java @@ -119,7 +119,8 @@ public class DiskReducePartitionDataWriterSuiteJ { 256, "disk1", StorageInfo.Type.HDD, - null); + null, + CONF.workerFlusherBufferSize()); CelebornConf conf = new CelebornConf(); conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), "0.8"); @@ -442,7 +443,8 @@ public class DiskReducePartitionDataWriterSuiteJ { 256, "disk2", StorageInfo.Type.HDD, - null); + null, + CONF.workerFlusherBufferSize()); } @Test diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala index 068c351ab..0cd13f8a9 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala @@ -213,7 +213,8 @@ class TierWriterSuite extends AnyFunSuite with BeforeAndAfterEach { 256, "disk1", StorageInfo.Type.HDD, - null) + null, + celebornConf.workerFlusherBufferSize) val storageManager: StorageManager = Mockito.mock(classOf[StorageManager]) MemoryManager.initialize(celebornConf, storageManager, null)
