This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1ead784fa [CELEBORN-2085] Use a fixed buffer for flush copying to
reduce GC
1ead784fa is described below
commit 1ead784fa1374bcfc1a8f8802e38fca48b1ee138
Author: TheodoreLx <[email protected]>
AuthorDate: Fri Aug 8 13:57:21 2025 +0800
[CELEBORN-2085] Use a fixed buffer for flush copying to reduce GC
### What changes were proposed in this pull request?
Apply for a byte array in advance and use it as a transfer when copying is
needed during flush
### Why are the changes needed?
For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the
CompositeByteBuf in the parameter to a byte array when flushing, and then use
the respective clients to write the byte array to the storage.
When the flush throughput rate is very high, this copying will cause very
serious GC problems and affect the performance of the worker
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
cluster test
Closes #3394 from TheodoreLx/copy-on-flush.
Authored-by: TheodoreLx <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 14 ++++++++
docs/configuration/worker.md | 1 +
.../service/deploy/worker/storage/FlushTask.scala | 28 ++++++++++-----
.../service/deploy/worker/storage/Flusher.scala | 41 ++++++++++++++++------
.../deploy/worker/storage/StorageManager.scala | 15 +++++---
.../storage/PartitionDataWriterSuiteUtils.java | 3 +-
.../local/DiskMapPartitionDataWriterSuiteJ.java | 3 +-
.../local/DiskReducePartitionDataWriterSuiteJ.java | 6 ++--
.../deploy/worker/storage/TierWriterSuite.scala | 3 +-
9 files changed, 87 insertions(+), 27 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9d7184c96..986d77e0f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -687,6 +687,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def masterResourceConsumptionInterval: Long =
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
def masterResourceConsumptionMetricsEnabled: Boolean =
get(MASTER_RESOURCE_CONSUMPTION_METRICS_ENABLED)
+ def workerFlushReuseCopyBufferEnabled: Boolean =
+ get(WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED)
def clusterName: String = get(CLUSTER_NAME)
// //////////////////////////////////////////////////////
@@ -6613,4 +6615,16 @@ object CelebornConf extends Logging {
.version("0.6.0")
.booleanConf
.createWithDefaultString("false")
+
+ val WORKER_FLUSH_REUSE_COPY_BUFFER_ENABLED: ConfigEntry[Boolean] =
+ buildConf("worker.flush.reuseCopyBuffer.enabled")
+ .categories("worker")
+ .doc("Whether to enable reuse copy buffer for flush. Note that this copy
buffer must not" +
+ " be referenced again after flushing. This means that, for example,
the Hdfs(Oss or S3) client" +
+ " will not asynchronously access this buffer after the flush method
returns, otherwise data" +
+ " modification problems will occur.")
+ .version("0.6.1")
+ .booleanConf
+ .createWithDefaultString("true")
+
}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8e9af6608..72343249e 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -193,4 +193,5 @@ license: |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false |
Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file
writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a
file writer to create if its creation was failed. | 0.2.0 | |
+| worker.flush.reuseCopyBuffer.enabled | true | false | Whether to enable
reuse copy buffer for flush. Note that this copy buffer must not be referenced
again after flushing. This means that, for example, the Hdfs(Oss or S3) client
will not asynchronously access this buffer after the flush method returns,
otherwise data modification problems will occur. | 0.6.1 | |
<!--end-include-->
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index b87b1d440..5053456d4 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -34,7 +34,19 @@ abstract private[worker] class FlushTask(
val notifier: FlushNotifier,
val keepBuffer: Boolean,
val source: AbstractSource) {
- def flush(): Unit
+ def flush(copyBytes: Array[Byte]): Unit
+
+ def convertBufferToBytes(
+ buffer: CompositeByteBuf,
+ copyBytes: Array[Byte],
+ length: Int): Array[Byte] = {
+ if (copyBytes != null && copyBytes.length >= length) {
+ buffer.readBytes(copyBytes, 0, length)
+ copyBytes
+ } else {
+ ByteBufUtil.getBytes(buffer)
+ }
+ }
}
private[worker] class LocalFlushTask(
@@ -44,7 +56,7 @@ private[worker] class LocalFlushTask(
keepBuffer: Boolean,
source: AbstractSource,
gatherApiEnabled: Boolean) extends FlushTask(buffer, notifier, keepBuffer,
source) {
- override def flush(): Unit = {
+ override def flush(copyBytes: Array[Byte]): Unit = {
val readableBytes = buffer.readableBytes()
val buffers = buffer.nioBuffers()
if (gatherApiEnabled) {
@@ -90,12 +102,12 @@ private[worker] class HdfsFlushTask(
notifier: FlushNotifier,
keepBuffer: Boolean,
source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer,
source) {
- override def flush(): Unit = {
+ override def flush(copyBytes: Array[Byte]): Unit = {
val readableBytes = buffer.readableBytes()
val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
val hdfsStream = hadoopFs.append(path, 256 * 1024)
flush(hdfsStream) {
- hdfsStream.write(ByteBufUtil.getBytes(buffer))
+ hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
}
@@ -112,9 +124,9 @@ private[worker] class S3FlushTask(
finalFlush: Boolean = false)
extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
- override def flush(): Unit = {
+ override def flush(copyBytes: Array[Byte]): Unit = {
val readableBytes = buffer.readableBytes()
- val bytes = ByteBufUtil.getBytes(buffer)
+ val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes)
val inputStream = new ByteArrayInputStream(bytes)
flush(inputStream) {
s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
@@ -134,9 +146,9 @@ private[worker] class OssFlushTask(
finalFlush: Boolean = false)
extends DfsFlushTask(buffer, notifier, keepBuffer, source) {
- override def flush(): Unit = {
+ override def flush(copyBytes: Array[Byte]): Unit = {
val readableBytes = buffer.readableBytes()
- val bytes = ByteBufUtil.getBytes(buffer)
+ val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes)
val inputStream = new ByteArrayInputStream(bytes)
flush(inputStream) {
ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 1aa7bde6f..2a4134e79 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -41,7 +41,9 @@ abstract private[worker] class Flusher(
val allocator: ByteBufAllocator,
val maxComponents: Int,
flushTimeMetric: TimeWindow,
- mountPoint: String) extends Logging {
+ mountPoint: String,
+ val reuseCopyBuffer: Boolean,
+ val maxTaskSize: Long) extends Logging {
protected lazy val flusherId: Int = System.identityHashCode(this)
protected val workingQueues = new
Array[LinkedBlockingQueue[FlushTask]](threadCount)
protected val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf]()
@@ -62,6 +64,10 @@ abstract private[worker] class Flusher(
workers(index) =
ThreadUtils.newDaemonSingleThreadExecutor(s"$this-$index")
workers(index).submit(new Runnable {
override def run(): Unit = {
+ var copyBytes: Array[Byte] = null
+ if (reuseCopyBuffer) {
+ copyBytes = new Array[Byte](maxTaskSize.toInt)
+ }
while (!stopFlag.get()) {
val task = workingQueues(index).take()
val key = s"Flusher-$this-${Random.nextInt()}"
@@ -70,7 +76,7 @@ abstract private[worker] class Flusher(
try {
val flushBeginTime = System.nanoTime()
lastBeginFlushTime.set(index, flushBeginTime)
- task.flush()
+ task.flush(copyBytes)
if (flushTimeMetric != null) {
val delta = System.nanoTime() - flushBeginTime
flushTimeMetric.update(delta)
@@ -141,13 +147,16 @@ private[worker] class LocalFlusher(
maxComponents: Int,
val mountPoint: String,
val diskType: StorageInfo.Type,
- timeWindow: TimeWindow) extends Flusher(
+ timeWindow: TimeWindow,
+ maxTaskSize: Long) extends Flusher(
workerSource,
threadCount,
allocator,
maxComponents,
timeWindow,
- mountPoint)
+ mountPoint,
+ false,
+ maxTaskSize)
with DeviceObserver with Logging {
deviceMonitor.registerFlusher(this)
@@ -177,13 +186,17 @@ final private[worker] class HdfsFlusher(
workerSource: AbstractSource,
hdfsFlusherThreads: Int,
allocator: ByteBufAllocator,
- maxComponents: Int) extends Flusher(
+ maxComponents: Int,
+ reuseCopyBuffer: Boolean,
+ maxTaskSize: Long) extends Flusher(
workerSource,
hdfsFlusherThreads,
allocator,
maxComponents,
null,
- "HDFS") with Logging {
+ "HDFS",
+ reuseCopyBuffer,
+ maxTaskSize) with Logging {
override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
@@ -196,13 +209,17 @@ final private[worker] class S3Flusher(
workerSource: AbstractSource,
s3FlusherThreads: Int,
allocator: ByteBufAllocator,
- maxComponents: Int) extends Flusher(
+ maxComponents: Int,
+ reuseCopyBuffer: Boolean,
+ maxTaskSize: Long) extends Flusher(
workerSource,
s3FlusherThreads,
allocator,
maxComponents,
null,
- "S3") with Logging {
+ "S3",
+ reuseCopyBuffer,
+ maxTaskSize) with Logging {
override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
@@ -215,13 +232,17 @@ final private[worker] class OssFlusher(
workerSource: AbstractSource,
ossFlusherThreads: Int,
allocator: ByteBufAllocator,
- maxComponents: Int) extends Flusher(
+ maxComponents: Int,
+ reuseCopyBuffer: Boolean,
+ maxTaskSize: Long) extends Flusher(
workerSource,
ossFlusherThreads,
allocator,
maxComponents,
null,
- "OSS") with Logging {
+ "OSS",
+ reuseCopyBuffer,
+ maxTaskSize) with Logging {
override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index bf3791343..9353ee23a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -149,7 +149,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
conf.workerPushMaxComponents,
diskInfo.mountPoint,
diskInfo.storageType,
- diskInfo.flushTimeMetrics)
+ diskInfo.flushTimeMetrics,
+ conf.workerFlusherBufferSize)
flushers.put(diskInfo.mountPoint, flusher)
totalThread = totalThread + diskInfo.threadCount
}
@@ -176,7 +177,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
workerSource,
conf.workerHdfsFlusherThreads,
storageBufferAllocator,
- conf.workerPushMaxComponents)),
+ conf.workerPushMaxComponents,
+ conf.workerFlushReuseCopyBufferEnabled,
+ conf.workerHdfsFlusherBufferSize)),
conf.workerHdfsFlusherThreads)
} else {
(None, 0)
@@ -197,7 +200,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
workerSource,
conf.workerS3FlusherThreads,
storageBufferAllocator,
- conf.workerPushMaxComponents)),
+ conf.workerPushMaxComponents,
+ conf.workerFlushReuseCopyBufferEnabled,
+ conf.workerS3FlusherBufferSize)),
conf.workerS3FlusherThreads)
} else {
(None, 0)
@@ -218,7 +223,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
workerSource,
conf.workerOssFlusherThreads,
storageBufferAllocator,
- conf.workerPushMaxComponents)),
+ conf.workerPushMaxComponents,
+ conf.workerFlushReuseCopyBufferEnabled,
+ conf.workerOssFlusherBufferSize)),
conf.workerOssFlusherThreads)
} else {
(None, 0)
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index 918e2e27b..9d5c6c26a 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -195,7 +195,8 @@ public class PartitionDataWriterSuiteUtils {
256,
"disk1",
StorageInfo.Type.HDD,
- null);
+ null,
+ celebornConf.workerFlusherBufferSize());
Mockito.doAnswer(
invocation -> {
if (callCounter.getAndIncrement() == 0) {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
index 17392f464..87ed418a3 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
@@ -91,7 +91,8 @@ public class DiskMapPartitionDataWriterSuiteJ {
256,
"disk1",
StorageInfo.Type.HDD,
- null);
+ null,
+ CONF.workerFlusherBufferSize());
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
"0.8");
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
index 6fbbda71a..1faa04a55 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
@@ -119,7 +119,8 @@ public class DiskReducePartitionDataWriterSuiteJ {
256,
"disk1",
StorageInfo.Type.HDD,
- null);
+ null,
+ CONF.workerFlusherBufferSize());
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
"0.8");
@@ -442,7 +443,8 @@ public class DiskReducePartitionDataWriterSuiteJ {
256,
"disk2",
StorageInfo.Type.HDD,
- null);
+ null,
+ CONF.workerFlusherBufferSize());
}
@Test
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
index 068c351ab..0cd13f8a9 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -213,7 +213,8 @@ class TierWriterSuite extends AnyFunSuite with
BeforeAndAfterEach {
256,
"disk1",
StorageInfo.Type.HDD,
- null)
+ null,
+ celebornConf.workerFlusherBufferSize)
val storageManager: StorageManager = Mockito.mock(classOf[StorageManager])
MemoryManager.initialize(celebornConf, storageManager, null)