TheodoreLx commented on code in PR #3479:
URL: https://github.com/apache/celeborn/pull/3479#discussion_r2357450794


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -1514,6 +1515,25 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
       hardSplitIndexes: Array[Int] = Array.empty[Int]): Unit = {
     val length = fileWriters.length
     val result = new Array[StatusCode](length)
+
+    var finalBody: ByteBuf = body

Review Comment:
   In the worker, the buffer read from the socket by Netty is moved to the 
FileWriter's `flushBuffer`. When the `flushBuffer` reaches a certain size, a 
FlushTask is generated and queued for flushing in the Flusher's 
`workingQueues`. The current PR approach is to copy the buffer before moving it 
to the FileWriter to improve internal buffer space utilization. This has 
several advantages over copying before generating the FlushTask:
   1.  Generating a FlushTask after the flushBuffer reaches a certain size can 
take a long time, meaning that during this time, the buffer in the flushBuffer 
is still underutilized. If there are thousands of FileWriters on a worker, the 
data size of the flushBuffers across all FileWriters could reach over 1GB, but 
the memory used could be several times that amount. The earlier the copy is 
performed, the earlier the underutilized buffer can be released, improving 
memory utilization.
   2.  In my initial solution, I did copy before generating the FlushTask, but 
I observed high CPU consumption and sometimes very high `PrimaryPushDataTime`. 
Investigation revealed that if the received data is mostly PushMergedData, 
because PushMergedData may be composed of multiple batches, the buffer 
corresponding to each batch must be copied. Netty needs to iterate through the 
chunk queue to request a buffer. The more batches in a single PushMergedData, 
the greater the buffer traversal overhead. If the copy is performed before 
splitting the PushMergedData, only a single buffer request is required.
   3.  In the current PR, after copying the buffer, `FileWriter.write` is 
immediately called to add it to the `flushBuffer`. After the write method 
completes, the old buffer is released. Because the write method executes very 
quickly, there is almost no chance of the old and new buffers existing 
simultaneously and consuming memory.
   
   I agree with your suggestion to only copy when memory usage exceeds a 
certain threshold. If memory usage is not high, there is indeed no need to copy 
the buffer. I will develop this feature later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to