TheodoreLx commented on code in PR #3479:
URL: https://github.com/apache/celeborn/pull/3479#discussion_r2357450794
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1514,6 +1515,25 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
hardSplitIndexes: Array[Int] = Array.empty[Int]): Unit = {
val length = fileWriters.length
val result = new Array[StatusCode](length)
+
+ var finalBody: ByteBuf = body
Review Comment:
In the worker, the buffer read from the socket by Netty is moved to the
FileWriter's `flushBuffer`. When the `flushBuffer` reaches a certain size, a
FlushTask is generated and queued for flushing in the Flusher's
`workingQueues`. The current PR approach is to copy the buffer before moving it
to the FileWriter to improve internal buffer space utilization. This has
several advantages over copying before generating the FlushTask:
1. Generating a FlushTask after the flushBuffer reaches a certain size can
take a long time, meaning that during this time, the buffer in the flushBuffer
is still underutilized. If there are thousands of FileWriters on a worker, the
data size of the flushBuffers across all FileWriters could reach over 1GB, but
the memory used could be several times that amount. The earlier the copy is
performed, the earlier the underutilized buffer can be released, improving
memory utilization.
2. In my initial solution, I did copy before generating the FlushTask, but
I observed high CPU consumption and sometimes very high `PrimaryPushDataTime`.
Investigation revealed that if the received data is mostly PushMergedData,
because PushMergedData may be composed of multiple batches, the buffer
corresponding to each batch must be copied. Netty needs to iterate through the
chunk queue to request a buffer. The more batches in a single PushMergedData,
the greater the buffer traversal overhead. If the copy is performed before
splitting the PushMergedData, only a single buffer request is required.
3. In the current PR, after copying the buffer, `FileWriter.write` is
immediately called to add it to the `flushBuffer`. After the write method
completes, the old buffer is released. Because the write method executes very
quickly, there is almost no chance of the old and new buffers existing
simultaneously and consuming memory.
I agree with your suggestion to only copy when memory usage exceeds a
certain threshold. If memory usage is not high, there is indeed no need to copy
the buffer. I will develop this feature later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]